Types¶
FSharp.Control.AsyncRx is built around the interfaces IAsyncDisposable
,
IAsyncObserver
and IAsyncObservable
. This enables a familiar Rx
programming syntax similar to FSharp.Control.Reactive
with the difference that all methods are Async.
type IAsyncDisposable =
abstract member DisposeAsync: unit -> Async<unit>
type IAsyncObserver<'a> =
abstract member OnNextAsync: 'a -> Async<unit>
abstract member OnErrorAsync: exn -> Async<unit>
abstract member OnCompletedAsync: unit -> Async<unit>
type IAsyncObservable<'a> =
abstract member SubscribeAsync: IAsyncObserver<'a> -> Async<IAsyncDisposable>
The relationship between these three interfaces can be seen in this single line of code. You get an async disposable by subscribing asynchronously to an async observable with an async observer:
async {
let! disposableAsync = observable.SubscribeAsync observerAsync
...
}
Async Observables¶
AsyncRx is an implementation of Async Observable. The difference between an
“Async Observable” and an “Observable” is that with “Async Observables” you
need to await methods such as Subscribe
, OnNext
, OnError
, and
OnCompleted
. In AsyncRx they are thus called SubscribeAsync
,
OnNextAsync
, OnErrorAsync
, and OnCompletedAsync
. This enables
SubscribeAsync
to await async operations i.e setup network connections, and
observers (OnNext
) may finally await side effects such as writing to disk
(observers are all about side-effects right?).
This diagram shows the how Async Observables relates to other collections and values.
Single Value | Multiple Values | |
---|---|---|
Synchronous pull | unit -> ‘a | seq<’a> |
Synchronous push | ‘a -> unit | Observable<’a> |
Asynchronous pull | unit -> Async<’a> | AsyncSeq<’a> |
Asynchronous push | ‘a -> Async<unit> | AsyncObservable<’a> |
Async Observers¶
Observers (IAsyncObserver<'a>
) may be subscribed to observables in
order to receive notifications. An observer is defined as an
implementation of IAsyncObserver<'a>
:
let _obv =
{ new IAsyncObserver<'a> with
member this.OnNextAsync x = async {
printfn "OnNext: %d" x
}
member this.OnErrorAsync err = async {
printfn "OnError: %s" (ex.ToString())
}
member this.OnCompletedAsync () = async {
printfn "OnCompleted"
}
}
There is also a SubscribeAsync
overload that takes a notification
function so instead of using IAsyncObserver<'a>
instances you may
subscribe with a single async function taking a Notification<'a>
.
let obv n =
async {
match n with
| OnNext x -> printfn "OnNext: %d" x
| OnError ex -> printfn "OnError: %s" (ex.ToString())
| OnCompleted -> printfn "OnCompleted"
}
Notifications¶
Observers may written as a function accepting Notification<'a>
which
is defined as the following discriminated union:
type Notification<'a> =
| OnNext of 'a
| OnError of exn
| OnCompleted