Reactive Extensions and .NET Tasks
Snippets
Table Of Contents
Sequential, onError
is called on exception
F#
C#
#r "nuget: FSharp.Control.Reactive"
open System
open System.Threading.Tasks
open System.Reactive.Linq
open FSharp.Control.Reactive
let obs = Observable.interval (TimeSpan.FromSeconds 1.)
let handle x =
task {
printfn $"Task %d{x} started"
if x = 3L then
failwithf $"I don't like number %d{x}"
do! Task.Delay 2000
printfn $"Task %d{x} finished"
return x * x
}
let sub =
obs
|> Observable.map (fun i -> Observable.FromAsync(fun () -> handle i))
|> Observable.concatInner
|> Observable.subscribeWithError
(fun i -> printfn $"onNext: %A{i}")
(fun error -> printfn $"onError: %A{error}")
using System.Reactive.Linq;
async Task<int> Handle(int i)
{
Console.WriteLine($"Task {i} started");
if (i == 3)
{
throw new Exception($"I don't like number {i}");
}
await Task.Delay(2000);
Console.WriteLine($"Task {i} finished");
return i * i;
}
var sub =
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(i => Observable.FromAsync(() => Handle((int)i)))
.Concat()
.Subscribe(
i => Console.WriteLine($"onNext: {i}"),
error => Console.WriteLine($"onError: {error}")
);
Console.ReadLine();
Sequential, handling exceptions without calling onError
F#
C#
#r "nuget: FSharp.Control.Reactive"
open System
open System.Threading.Tasks
open System.Reactive.Linq
open FSharp.Control.Reactive
let obs = Observable.interval (TimeSpan.FromSeconds 1.)
let handle (x: int64) : Task<int64> =
task {
printfn $"Task %d{x} started"
if x = 3L then
failwithf $"I don't like number %d{x}"
do! Task.Delay 2000
printfn $"Task %d{x} finished"
return x * x
}
let sub =
obs
|> Observable.map (fun i ->
Observable.FromAsync(fun () ->
task {
try
let! result = handle i
return Ok result
with ex ->
printfn $"Task %d{i} failed: %A{ex}"
return Error $"Failed to execute handler: %s{ex.Message}"
}
)
)
|> Observable.concatInner
|> Observable.subscribeWithError
(fun result ->
match result with
| Ok r -> printfn $"Computation succeeded: %d{r}"
| Error error -> printfn $"Computation failed: %s{error}"
)
(fun error -> printfn $"onError: %A{error}")
using System.Reactive.Linq;
async Task<int> Handle(int i)
{
Console.WriteLine($"Task {i} started");
if (i == 3)
{
throw new Exception($"I don't like number {i}");
}
await Task.Delay(2000);
Console.WriteLine($"Task {i} finished");
return i * i;
}
var sub =
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(i => Observable.FromAsync(async () =>
{
try
{
int? result = await Handle((int)i);
return result;
}
catch (Exception ex)
{
Console.WriteLine($"Task {i} failed: {ex.Message}");
return null;
}
}
)
)
.Concat()
.Subscribe(
i => Console.WriteLine($"onNext: {i}"),
error => Console.WriteLine($"onError: {error}")
);
Console.ReadLine();