Custom Observables¶
There are many ways to create custom observble streams. Note that the
goal is to have some kind of create function that returns an
IAsyncObservable<'a>
. We will go through a few options for creating
such a custom stream.
1. Use a Subject¶
A Subject in AsyncRx return both an observer (IAsyncObserver
) and an
observable (IAsyncObservable
). Note that we need to use
Async.Start
to start the worker function so it runs concurrently.
open FSharp.Control
let myStream () =
let dispatch, obs = AsyncRx.subject<Msg> ()
let worker () = async {
while true do
let! msg = getMessageAsync ()
do! dispatch.OnNextAsync msg
}
Async.Start (worker ())
obs
2. Use Create¶
The AsyncRx.Create
function takes an Async
subscribe function and
returns an IAsyncObservable
. Note that we need to use Async.Start to start
the worker function so it runs concurrently.
open FSahrp.Control
let myStream () =
let subscribeAsync (obs: IAsyncObserver<Msg>) : Async<IAsyncDisposable> =
let mutable running = true
async {
let worker () = async {
while running do
let! msg = getMessageAsync ()
do! obs.OnNextAsync msg
}
Async.Start (worker ())
let cancel () = async {
running <- false
}
return AsyncDisposable.Create(cancel)
}
AsyncRx.create(subscribeAsync)
3. Use ofAsyncWorker¶
The ofAsyncWorker
is a handy utility function for creating an
IAsyncObservable
from an async worker function, where the worker
function has the type IAsyncObserver<'a> -> CancellationToken ->
Async<unit>
. Thus the worker will receive a cancellation token that
can be used to detect if cancellation (dispose) have been requested.
open System.Threading
open FSharp.Control
let myStream' () =
let worker (obv: IAsyncObserver<Msg>) (token: CancellationToken) = async {
while not token.IsCancellationRequested do
let! msg = getMessageAsync ()
do! obv.OnNextAsync msg
}
AsyncRx.ofAsyncWorker(worker)