Custom Observables

There are many ways to create custom observble streams. Note that the goal is to have some kind of create function that returns an IAsyncObservable<'a>. We will go through a few options for creating such a custom stream.

1. Use a Subject

A Subject in AsyncRx return both an observer (IAsyncObserver) and an observable (IAsyncObservable). Note that we need to use Async.Start to start the worker function so it runs concurrently.

open FSharp.Control

let myStream () =
    let dispatch, obs = AsyncRx.subject<Msg> ()

    let worker () = async {
        while true do
            let! msg = getMessageAsync ()
            do! dispatch.OnNextAsync msg
    }

    Async.Start (worker ())
    obs

2. Use Create

The AsyncRx.Create function takes an Async subscribe function and returns an IAsyncObservable. Note that we need to use Async.Start to start the worker function so it runs concurrently.

open FSahrp.Control

let myStream () =
    let subscribeAsync (obs: IAsyncObserver<Msg>) : Async<IAsyncDisposable> =
        let mutable running = true

        async {
            let worker () = async {
                while running do
                    let! msg = getMessageAsync ()
                    do! obs.OnNextAsync msg
                }

            Async.Start (worker ())

            let cancel () = async {
                running <- false
            }

            return AsyncDisposable.Create(cancel)
        }

    AsyncRx.create(subscribeAsync)

3. Use ofAsyncWorker

The ofAsyncWorker is a handy utility function for creating an IAsyncObservable from an async worker function, where the worker function has the type IAsyncObserver<'a> -> CancellationToken -> Async<unit>. Thus the worker will receive a cancellation token that can be used to detect if cancellation (dispose) have been requested.

open System.Threading
open FSharp.Control

let myStream' () =
    let worker (obv: IAsyncObserver<Msg>) (token: CancellationToken)  = async {
        while not token.IsCancellationRequested do
            let! msg = getMessageAsync ()
            do! obv.OnNextAsync msg
    }

    AsyncRx.ofAsyncWorker(worker)