Operators¶
The following parameterized async observerable returning functions (operators) are currently supported. Other operators may be implemented on-demand, but the goal is to keep it simple and not make this into a full featured ReactiveX implementation (if possible).
To use the operators open either the FSharp.Control namespace.
open FSharp.Control
xs = AsyncRx.single 42
You can also open the FSharp.Control.AsyncRx
module if you don’t
want to prepend every operator with AsyncRx
. Be aware of
possible namespace conflicts with operators such as map
.
open FSharp.Control.AsyncRx
xs = single 42
Creating¶
Functions for creating ('a -> IAsyncObservable<'a>
) an async observable.
-
val
empty
: : unit -> IAsyncObservable<'a>¶ Returns an async observable sequence with no elements. You must usually indicate which type the resulting observable should be since empty itself doesn’t produce any values.
Example:
let xs = AsyncRx.empty<int> ()
-
val
single
: x:'a -> IAsyncObservable<'a>¶ Returns an observable sequence containing the single specified element.
Example:
let xs = AsyncRx.single 42
-
val
fail
: error:exn -> IAsyncObservable<'a>¶ Returns the observable sequence that terminates exceptionally with the specified exception.
Example:
exception MyError of string let error = MyError "error" let xs = AsyncRx.fail<int> error
-
val
defer
: factory:(unit -> IAsyncObservable<'a>) -> IAsyncObservable<'a>¶ Returns an observable sequence that invokes the specified factory function whenever a new observer subscribes.
-
val
create
: subscribe:(IAsyncObserver<'a> -> Async<IAsyncDisposable>) -> IAsyncObservable<'a>¶ Creates an async observable (AsyncObservable<’a>) from the given subscribe function.
-
val
ofSeq
: seq<'a> -> IAsyncObservable<'a>¶ Returns the async observable sequence whose elements are pulled from the given enumerable sequence.
-
val
ofAsyncSeq
: AsyncSeq<'a> -> IAsyncObservable<'a>¶ Convert async sequence into an async observable (Not available in Fable).
-
val
timer
: int -> IAsyncObservable<int>¶ Returns an observable sequence that triggers the value 0 after the given duetime.
-
val
interval
: int -> IAsyncObservable<int>¶ Returns an observable sequence that triggers the increasing sequence starting with 0 after the given period.
Transforming¶
Functions for transforming (IAsyncObservable<'a> ->
IAsyncObservable<'b>
) an async observable.
-
val
map
: mapper:('a -> 'b) -> source: IAsyncObservable<'a> -> IAsyncObservable<'b>¶ Returns an observable sequence whose elements are the result of invoking the mapper function on each element of the source.
Example:
let mapper x = x * 10 let xs = AsyncRx.single 42 |> AsyncRx.map mapper
-
val
mapi
: mapper:('a*int -> 'b) -> IAsyncObservable<'a> -> IAsyncObservable<'b>¶ Returns an observable sequence whose elements are the result of invoking the mapper function and incorporating the element’s index on each element of the source.
-
val
mapAsync
: ('a -> Async<'b>) -> IAsyncObservable<'a> -> IAsyncObservable<'b>¶ Returns an observable sequence whose elements are the result of invoking the async mapper function on each element of the source.
-
val
mapiAsync
: ('a*int -> Async<'b>) -> IAsyncObservable<'a> -> IAsyncObservable<'b>¶ Returns an observable sequence whose elements are the result of invoking the async mapper function by incorporating the element’s index on each element of the source.
-
val
flatMap
: ('a -> IAsyncObservable<'b>) -> IAsyncObservable<'a> -> IAsyncObservable<'b>¶ Projects each element of an observable sequence into an observable sequence and merges the resulting observable sequences back into one observable sequence.
-
val
flatMapi
: ('a*int -> IAsyncObservable<'b>) -> IAsyncObservable<'a> -> IAsyncObservable<'b>¶ Projects each element of an observable sequence into an observable sequence by incorporating the element’s index on each element of the source. Merges the resulting observable sequences back into one observable sequence.
-
val
flatMapAsync
: ('a -> Async\<IAsyncObservable\<'b\>\>) -> IAsyncObservable<'a> -> IAsyncObservable<'b>¶ Asynchronously projects each element of an observable sequence into an observable sequence and merges the resulting observable sequences back into one observable sequence.
-
val
flatMapiAsync
: ('a*int -> Async<IAsyncObservable\<'b\>\>) -> IAsyncObservable<'a> -> IAsyncObservable<'b>¶ Asynchronously projects each element of an observable sequence into an observable sequence by incorporating the element’s index on each element of the source. Merges the resulting observable sequences back into one observable sequence.
-
val
flatMapLatest
: ('a -> IAsyncObservable<'b>) -> IAsyncObservable<'a> -> IAsyncObservable<'b>¶ Transforms the items emitted by an source sequence into observable streams, and mirror those items emitted by the most-recently transformed observable sequence.
-
val
flatMapLatestAsync
: ('a -> Async<IAsyncObservable\<'b\>\>) -> IAsyncObservable<'a> -> IAsyncObservable<'b>¶ Asynchronosly transforms the items emitted by an source sequence into observable streams, and mirror those items emitted by the most-recently transformed observable sequence.
-
val
catch
: (exn -> IAsyncObservable<'a>) -> IAsyncObservable<'a> -> IAsyncObservable<'a>¶ Returns an observable sequence containing the first sequence’s elements, followed by the elements of the handler sequence in case an exception occurred.
Filtering¶
Functions for filtering (IAsyncObservable<'a> ->
IAsyncObservable<'a>
) an async observable.
-
val
filter
: predicate:('a -> bool) -> IAsyncObservable<'a> -> IAsyncObservable<'a>¶ Filters the elements of an observable sequence based on a predicate. Returns an observable sequence that contains elements from the input sequence that satisfy the condition.
Example:
let predicate x = x < 3 let xs = AsyncRx.ofSeq <| seq { 1..5 } |> AsyncRx.filter predicate
-
val
filterAsync
: ('a -> Async<bool>) -> IAsyncObservable<'a> -> IAsyncObservable<'a>¶ Filters the elements of an observable sequence based on an async predicate. Returns an observable sequence that contains elements from the input sequence that satisfy the condition.
-
val
distinctUntilChanged
: IAsyncObservable<'a> -> IAsyncObservable<'a>¶ Return an observable sequence only containing the distinct contiguous elementsfrom the source sequence.
-
val
takeUntil
: IAsyncObservable<'b> -> IAsyncObservable<'a> -> IAsyncObservable<'a>¶ Returns the values from the source observable sequence until the other observable sequence produces a value.
-
val
choose
: ('a -> 'b option) -> IAsyncObservable<'a> -> IAsyncObservable<'b>¶ Applies the given function to each element of the stream and returns the stream comprised of the results for each element where the function returns Some with some value.
-
val
chooseAsync
: ('a -> Async<'b option>) -> IAsyncObservable<'a> -> IAsyncObservable<'b>¶ Applies the given async function to each element of the stream and returns the stream comprised of the results for each element where the function returns Some with some value.
Aggregating¶
-
val
scan
: initial:'s -> accumulator:('s -> 'a -> 's) -> source: IAsyncObservable<'a> -> IAsyncObservable<'s>¶ Applies an accumulator function over an observable sequence for every value ‘a and returns each intermediate result ‘s. The initial seed value is used as the initial accumulator value. Returns an observable sequence containing the accumulated values ‘s.
Example:
let scanner a x = a + x let xs = AsyncRx.ofSeq <| seq { 1..5 } |> AsyncRx.scan 0 scanner
-
val
scanAsync
: initial: 's -> accumulator: ('s -> 'a -> Async<'s>) -> source: IAsyncObservable<'a> -> IAsyncObservable<'s>¶ Applies an async accumulator function over an observable sequence and returns each intermediate result. The seed value is used as the initial accumulator value. Returns an observable sequence containing the accumulated values.
Example:
let scannerAsync a x = async { return a + x } let xs = AsyncRx.ofSeq <| seq { 1..5 } |> AsyncRx.scanAsync 0 scannerAsync
-
val
groupBy
: keyMapper: ('a -> 'g) -> source: IAsyncObservable<'a> -> IAsyncObservable<IAsyncObservable<'a>>¶ Groups the elements of an observable sequence according to a specified key mapper function. Returns a sequence of observable groups, each of which corresponds to a given key.
Example:
let xs = AsyncRx.ofSeq [1; 2; 3; 4; 5; 6] |> AsyncRx.groupBy (fun x -> x % 2) |> AsyncRx.flatMap (fun x -> x)
Combining¶
Functions for combining multiple async observables into one.
-
val
merge
: IAsyncObservable<'a> -> IAsyncObservable<'a> -> IAsyncObservable<'a>¶ Merges an observable sequence with another observable sequence.
-
val
mergeInner
: IAsyncObservable\<IAsyncObservable<'a>\> -> IAsyncObservable<'a>¶ Merges an observable sequence of observable sequences into an observable sequence.
-
val
switchLatest
: IAsyncObservable<IAsyncObservable<'a>> -> IAsyncObservable<'a>¶ Transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence.
-
val
concat
: seq<IAsyncObservable<'a>> -> IAsyncObservable<'a>¶ Concatenates an observable sequence with another observable sequence.
-
val
startWith
: seq<'a> -> IAsyncObservable<'a> -> IAsyncObservable<'a>¶ Prepends a sequence of values to an observable sequence. Returns the source sequence prepended with the specified values.
-
val
combineLatest
: IAsyncObservable<'b> -> IAsyncObservable<'a> -> IAsyncObservable<'a*'b>¶ Merges the specified observable sequences into one observable sequence by combining elements of the sources into tuples. Returns an observable sequence containing the combined results.
-
val
withLatestFrom
: IAsyncObservable<'b> -> IAsyncObservable<'a> -> IAsyncObservable<'a*'b>¶ Merges the specified observable sequences into one observable sequence by combining the values into tuples only when the first observable sequence produces an element. Returns the combined observable sequence.
-
val
zipSeq
: seq<'b> -> IAsyncObservable<'a> -> IAsyncObservable<'a*'b>¶ Zip given sequence with source. Combines one and one item from each stream into one tuple.
Time-shifting¶
Functions for time-shifting (IAsyncObservable<'a> ->
IAsyncObservable<'a>
) an async observable.
-
val
delay
: int -> IAsyncObservable<'a> -> IAsyncObservable<'a>¶ Time shifts the observable sequence by the given timeout. The relative time intervals between the values are preserved.
-
val
debounce
: int -> IAsyncObservable<'a> -> IAsyncObservable<'a>¶ Ignores values from an observable sequence which are followed by another value before the given timeout.
-
val
sample
: msecs: int -> source: IAsyncObservable<'a> -> IAsyncObservable<'a>¶ Samples the observable sequence at each interval.