Subjects¶
You can think of a “Subject” as being a tube that is open at both sides.
Whatever you put in on one side, pops out at the other end. Subjects combines
both the IAsyncObservable<'a> and IAsyncObserver<'a> interfaces.
Whatever you put in (e.g OnNextAsync) at one side (IAsyncObserver<'a>)
will come out of the other end (IAsyncObservable<'a>).
The subject in FSharp.AsyncRx is very similar to the classic Subject in
ReactiveX. The difference is that a is not a single
object, but a tuple of two entangled objects where one implements
IAsyncObserver<'a>, and the other implements IAsyncObservable<'a>. This
solves the problem of having a single object trying to be two things at once
(what is the dual of an ISubject<'a> anyways?).
There are currently 3 types of subjects in AsyncRx, subject, mbSubject and singleSubject.
-
val
subject: unit -> IAsyncObserver<'a> * IAsyncObservable<'a>¶ The subject will forward any notification pushed to the
IAsyncObserver<'a>side to all observers that have subscribed to theIAsyncObservable<'a>side. The subject is hot in the sense that if there are no observers, then the notification will be lost.let dispatch, obs = AsyncRx.subject () let main = async { let! sub = obs.SubscribeAsync obv do! dispatch.OnNextAsync 42 ... } Async.StartImmediate main ()
-
val
mbSubject: unit -> MailboxProcessor<Notification<'a>> * IAsyncObservable<'a>¶ The Mailbox Subject is the same as a
subjectexcept that the observer is exposed as aMailboxProcessor<Notification<'a>>. The mailbox subject is hot in the sense that if there are no observers, then any pushed notification will be lost.let dispatch, obs = AsyncRx.mbSubject () let main = async { let! sub = obs.SubscribeAsync obv do dispatch.Post (OnNext 42) ... } Async.StartImmediate main ()
-
val
singleSubject: unit -> IAsyncObserver<'a> * IAsyncObservable<'a>¶ The singleSubject will forward any notification pushed to the
IAsyncObserver<'a>side to a single observer that have subscribed to theIAsyncObservable<'a>side. The singleSubject is “cold” in the sense that if there’s no-one observing, then the writer will be awaited until there is a subscriber that can observe the value being pushed. You can use a singleSubject in scenarios that requires so called backpressure.let dispatch, obs = AsyncRx.singleSubject () let main = async { let! sub = obs.SubscribeAsync obv do! dispatch.OnNextAsync 42 ... } Async.StartImmediate main ()