Subjects¶
You can think of a “Subject” as being a tube that is open at both sides.
Whatever you put in on one side, pops out at the other end. Subjects combines
both the IAsyncObservable<'a>
and IAsyncObserver<'a>
interfaces.
Whatever you put in (e.g OnNextAsync
) at one side (IAsyncObserver<'a>
)
will come out of the other end (IAsyncObservable<'a>
).
The subject in FSharp.AsyncRx is very similar to the classic Subject in
ReactiveX. The difference is that a is not a single
object, but a tuple of two entangled objects where one implements
IAsyncObserver<'a>
, and the other implements IAsyncObservable<'a>
. This
solves the problem of having a single object trying to be two things at once
(what is the dual of an ISubject<'a>
anyways?).
There are currently 3 types of subjects in AsyncRx, subject, mbSubject and singleSubject.
-
val
subject
: unit -> IAsyncObserver<'a> * IAsyncObservable<'a>¶ The subject will forward any notification pushed to the
IAsyncObserver<'a>
side to all observers that have subscribed to theIAsyncObservable<'a>
side. The subject is hot in the sense that if there are no observers, then the notification will be lost.let dispatch, obs = AsyncRx.subject () let main = async { let! sub = obs.SubscribeAsync obv do! dispatch.OnNextAsync 42 ... } Async.StartImmediate main ()
-
val
mbSubject
: unit -> MailboxProcessor<Notification<'a>> * IAsyncObservable<'a>¶ The Mailbox Subject is the same as a
subject
except that the observer is exposed as aMailboxProcessor<Notification<'a>>
. The mailbox subject is hot in the sense that if there are no observers, then any pushed notification will be lost.let dispatch, obs = AsyncRx.mbSubject () let main = async { let! sub = obs.SubscribeAsync obv do dispatch.Post (OnNext 42) ... } Async.StartImmediate main ()
-
val
singleSubject
: unit -> IAsyncObserver<'a> * IAsyncObservable<'a>¶ The singleSubject will forward any notification pushed to the
IAsyncObserver<'a>
side to a single observer that have subscribed to theIAsyncObservable<'a>
side. The singleSubject is “cold” in the sense that if there’s no-one observing, then the writer will be awaited until there is a subscriber that can observe the value being pushed. You can use a singleSubject in scenarios that requires so called backpressure.let dispatch, obs = AsyncRx.singleSubject () let main = async { let! sub = obs.SubscribeAsync obv do! dispatch.OnNextAsync 42 ... } Async.StartImmediate main ()