ivan.engineering
Light Dark

Reactive Extensions and .NET Tasks

Snippets

Table Of Contents

Sequential, onError is called on exception

F#
C#
#r "nuget: FSharp.Control.Reactive"

open System
open System.Threading.Tasks
open System.Reactive.Linq
open FSharp.Control.Reactive

let obs = Observable.interval (TimeSpan.FromSeconds 1.)

let handle x =
  task {
    printfn $"Task %d{x} started"

    if x = 3L then
      failwithf $"I don't like number %d{x}"

    do! Task.Delay 2000
    printfn $"Task %d{x} finished"
    return x * x
  }

let sub =
  obs
  |> Observable.map (fun i -> Observable.FromAsync(fun () -> handle i))
  |> Observable.concatInner
  |> Observable.subscribeWithError
    (fun i -> printfn $"onNext: %A{i}")
    (fun error -> printfn $"onError: %A{error}")
using System.Reactive.Linq;

async Task<int> Handle(int i)
{
  Console.WriteLine($"Task {i} started");

  if (i == 3)
  {
    throw new Exception($"I don't like number {i}");
  }

  await Task.Delay(2000);
  Console.WriteLine($"Task {i} finished");
  return i * i;
}

var sub =
  Observable
    .Interval(TimeSpan.FromSeconds(1))
    .Select(i => Observable.FromAsync(() => Handle((int)i)))
    .Concat()
    .Subscribe(
      i => Console.WriteLine($"onNext: {i}"),
      error => Console.WriteLine($"onError: {error}")
    );

Console.ReadLine();

Sequential, handling exceptions without calling onError

F#
C#
#r "nuget: FSharp.Control.Reactive"

open System
open System.Threading.Tasks
open System.Reactive.Linq
open FSharp.Control.Reactive

let obs = Observable.interval (TimeSpan.FromSeconds 1.)

let handle (x: int64) : Task<int64> =
  task {
    printfn $"Task %d{x} started"

    if x = 3L then
      failwithf $"I don't like number %d{x}"

    do! Task.Delay 2000
    printfn $"Task %d{x} finished"
    return x * x
  }


let sub =
  obs
  |> Observable.map (fun i ->
    Observable.FromAsync(fun () ->
      task {
        try
          let! result = handle i
          return Ok result
        with ex ->
          printfn $"Task %d{i} failed: %A{ex}"
          return Error $"Failed to execute handler: %s{ex.Message}"
      }
    )
  )
  |> Observable.concatInner
  |> Observable.subscribeWithError
    (fun result ->
      match result with
      | Ok r -> printfn $"Computation succeeded: %d{r}"
      | Error error -> printfn $"Computation failed: %s{error}"
    )
    (fun error -> printfn $"onError: %A{error}")
using System.Reactive.Linq;

async Task<int> Handle(int i)
{
  Console.WriteLine($"Task {i} started");

  if (i == 3)
  {
    throw new Exception($"I don't like number {i}");
  }

  await Task.Delay(2000);
  Console.WriteLine($"Task {i} finished");
  return i * i;
}

var sub =
  Observable
    .Interval(TimeSpan.FromSeconds(1))
    .Select(i => Observable.FromAsync(async () =>
        {
          try
          {
            int? result = await Handle((int)i);
            return result;
          }
          catch (Exception ex)
          {
            Console.WriteLine($"Task {i} failed: {ex.Message}");
            return null;
          }
        }
      )
    )
    .Concat()
    .Subscribe(
      i => Console.WriteLine($"onNext: {i}"),
      error => Console.WriteLine($"onError: {error}")
    );

Console.ReadLine();