================== Custom Observables ================== There are many ways to create custom observble streams. Note that the goal is to have some kind of create function that returns an ``IAsyncObservable<'a>``. We will go through a few options for creating such a custom stream. 1. Use a Subject ================ A Subject in AsyncRx return both an observer (``IAsyncObserver``) and an observable (``IAsyncObservable``). Note that we need to use ``Async.Start`` to start the worker function so it runs concurrently. .. code:: fsharp open FSharp.Control let myStream () = let dispatch, obs = AsyncRx.subject () let worker () = async { while true do let! msg = getMessageAsync () do! dispatch.OnNextAsync msg } Async.Start (worker ()) obs 2. Use Create ============= The ``AsyncRx.Create`` function takes an ``Async`` subscribe function and returns an ``IAsyncObservable``. Note that we need to use `Async.Start` to start the worker function so it runs concurrently. .. code:: fsharp open FSahrp.Control let myStream () = let subscribeAsync (obs: IAsyncObserver) : Async = let mutable running = true async { let worker () = async { while running do let! msg = getMessageAsync () do! obs.OnNextAsync msg } Async.Start (worker ()) let cancel () = async { running <- false } return AsyncDisposable.Create(cancel) } AsyncRx.create(subscribeAsync) 3. Use ofAsyncWorker ==================== The ``ofAsyncWorker`` is a handy utility function for creating an ``IAsyncObservable`` from an async worker function, where the worker function has the type ``IAsyncObserver<'a> -> CancellationToken -> Async``. Thus the worker will receive a cancellation token that can be used to detect if cancellation (dispose) have been requested. .. code:: fsharp open System.Threading open FSharp.Control let myStream' () = let worker (obv: IAsyncObserver) (token: CancellationToken) = async { while not token.IsCancellationRequested do let! msg = getMessageAsync () do! obv.OnNextAsync msg } AsyncRx.ofAsyncWorker(worker)