========= Operators ========= The following parameterized async observerable returning functions (operators) are currently supported. Other operators may be implemented on-demand, but the goal is to keep it simple and not make this into a full featured `ReactiveX `_ implementation (if possible). To use the operators open either the `FSharp.Control` namespace. .. code:: fsharp open FSharp.Control xs = AsyncRx.single 42 You can also open the ``FSharp.Control.AsyncRx`` module if you don't want to prepend every operator with ``AsyncRx``. Be aware of possible namespace conflicts with operators such as ``map``. .. code:: fsharp open FSharp.Control.AsyncRx xs = single 42 Creating ======== Functions for creating (``'a -> IAsyncObservable<'a>``) an async observable. .. val:: empty :type: : unit -> IAsyncObservable<'a> Returns an async observable sequence with no elements. You must usually indicate which type the resulting observable should be since empty itself doesn't produce any values. .. marble:: :alt: empty [ empty () ] --| **Example:** .. code:: fsharp let xs = AsyncRx.empty () .. val:: single :type: x:'a -> IAsyncObservable<'a> Returns an observable sequence containing the single specified element. .. marble:: :alt: single [ single 42 ] -42-| **Example:** .. code:: fsharp let xs = AsyncRx.single 42 .. val:: fail :type: error:exn -> IAsyncObservable<'a> Returns the observable sequence that terminates exceptionally with the specified exception. **Example:** .. code:: fsharp exception MyError of string let error = MyError "error" let xs = AsyncRx.fail error .. val:: defer :type: factory:(unit -> IAsyncObservable<'a>) -> IAsyncObservable<'a> Returns an observable sequence that invokes the specified factory function whenever a new observer subscribes. .. val:: create :type: subscribe:(IAsyncObserver<'a> -> Async) -> IAsyncObservable<'a> Creates an async observable (`AsyncObservable<'a>`) from the given subscribe function. .. val:: ofSeq :type: seq<'a> -> IAsyncObservable<'a> Returns the async observable sequence whose elements are pulled from the given enumerable sequence. .. val:: ofAsyncSeq :type: AsyncSeq<'a> -> IAsyncObservable<'a> Convert async sequence into an async observable *(Not available in Fable)*. .. val:: timer :type: int -> IAsyncObservable Returns an observable sequence that triggers the value 0 after the given duetime. .. val:: interval :type: int -> IAsyncObservable Returns an observable sequence that triggers the increasing sequence starting with 0 after the given period. Transforming ============ Functions for transforming (``IAsyncObservable<'a> -> IAsyncObservable<'b>``) an async observable. .. val:: map :type: mapper:('a -> 'b) -> source: IAsyncObservable<'a> -> IAsyncObservable<'b> Returns an observable sequence whose elements are the result of invoking the mapper function on each element of the source. .. marble:: :alt: map ---1---2---3---4---> [ map (fun i -> i*2) ] ---2---4---6---8---> **Example:** .. code:: fsharp let mapper x = x * 10 let xs = AsyncRx.single 42 |> AsyncRx.map mapper .. val:: mapi :type: mapper:('a*int -> 'b) -> IAsyncObservable<'a> -> IAsyncObservable<'b> Returns an observable sequence whose elements are the result of invoking the mapper function and incorporating the element's index on each element of the source. .. marble:: :alt: map ----1----2----3----4----> [ mapi (fun (x, i) -> i*2) ] ----2----4----6----8----> .. val:: mapAsync :type: ('a -> Async<'b>) -> IAsyncObservable<'a> -> IAsyncObservable<'b> Returns an observable sequence whose elements are the result of invoking the async mapper function on each element of the source. .. marble:: :alt: map ------1------2------3------4------> [ mapAsync (fun i -> async { return i*2 }) ] ------2------4------6------8------> .. val:: mapiAsync :type: ('a*int -> Async<'b>) -> IAsyncObservable<'a> -> IAsyncObservable<'b> Returns an observable sequence whose elements are the result of invoking the async mapper function by incorporating the element's index on each element of the source. .. val:: flatMap :type: ('a -> IAsyncObservable<'b>) -> IAsyncObservable<'a> -> IAsyncObservable<'b> Projects each element of an observable sequence into an observable sequence and merges the resulting observable sequences back into one observable sequence. .. val:: flatMapi :type: ('a*int -> IAsyncObservable<'b>) -> IAsyncObservable<'a> -> IAsyncObservable<'b> Projects each element of an observable sequence into an observable sequence by incorporating the element's index on each element of the source. Merges the resulting observable sequences back into one observable sequence. .. val:: flatMapAsync :type: ('a -> Async\\>) -> IAsyncObservable<'a> -> IAsyncObservable<'b> Asynchronously projects each element of an observable sequence into an observable sequence and merges the resulting observable sequences back into one observable sequence. .. val:: flatMapiAsync :type: ('a*int -> Async\>) -> IAsyncObservable<'a> -> IAsyncObservable<'b> Asynchronously projects each element of an observable sequence into an observable sequence by incorporating the element's index on each element of the source. Merges the resulting observable sequences back into one observable sequence. .. val:: flatMapLatest :type: ('a -> IAsyncObservable<'b>) -> IAsyncObservable<'a> -> IAsyncObservable<'b> Transforms the items emitted by an source sequence into observable streams, and mirror those items emitted by the most-recently transformed observable sequence. .. val:: flatMapLatestAsync :type: ('a -> Async\>) -> IAsyncObservable<'a> -> IAsyncObservable<'b> Asynchronosly transforms the items emitted by an source sequence into observable streams, and mirror those items emitted by the most-recently transformed observable sequence. .. val:: catch :type: (exn -> IAsyncObservable<'a>) -> IAsyncObservable<'a> -> IAsyncObservable<'a> Returns an observable sequence containing the first sequence's elements, followed by the elements of the handler sequence in case an exception occurred. Filtering ========= Functions for filtering (``IAsyncObservable<'a> -> IAsyncObservable<'a>``) an async observable. .. val:: filter :type: predicate:('a -> bool) -> IAsyncObservable<'a> -> IAsyncObservable<'a> Filters the elements of an observable sequence based on a predicate. Returns an observable sequence that contains elements from the input sequence that satisfy the condition. .. marble:: :alt: filter -----1----2----3----4----| [ filter (fun i -A i>2) ] ---------------3----4----| **Example:** .. code:: fsharp let predicate x = x < 3 let xs = AsyncRx.ofSeq <| seq { 1..5 } |> AsyncRx.filter predicate .. val:: filterAsync :type: ('a -> Async) -> IAsyncObservable<'a> -> IAsyncObservable<'a> Filters the elements of an observable sequence based on an async predicate. Returns an observable sequence that contains elements from the input sequence that satisfy the condition. .. val:: distinctUntilChanged :type: IAsyncObservable<'a> -> IAsyncObservable<'a> Return an observable sequence only containing the distinct contiguous elementsfrom the source sequence. .. val:: takeUntil :type: IAsyncObservable<'b> -> IAsyncObservable<'a> -> IAsyncObservable<'a> Returns the values from the source observable sequence until the other observable sequence produces a value. .. val:: choose :type: ('a -> 'b option) -> IAsyncObservable<'a> -> IAsyncObservable<'b> Applies the given function to each element of the stream and returns the stream comprised of the results for each element where the function returns Some with some value. .. val:: chooseAsync :type: ('a -> Async<'b option>) -> IAsyncObservable<'a> -> IAsyncObservable<'b> Applies the given async function to each element of the stream and returns the stream comprised of the results for each element where the function returns Some with some value. Aggregating =========== .. val:: scan :type: initial:'s -> accumulator:('s -> 'a -> 's) -> source: IAsyncObservable<'a> -> IAsyncObservable<'s> Applies an accumulator function over an observable sequence for every value `'a` and returns each intermediate result `'s`. The `initial` seed value is used as the initial accumulator value. Returns an observable sequence containing the accumulated values `'s`. **Example:** .. code:: fsharp let scanner a x = a + x let xs = AsyncRx.ofSeq <| seq { 1..5 } |> AsyncRx.scan 0 scanner .. val:: scanAsync :type: initial: 's -> accumulator: ('s -> 'a -> Async<'s>) -> source: IAsyncObservable<'a> -> IAsyncObservable<'s> Applies an async accumulator function over an observable sequence and returns each intermediate result. The seed value is used as the initial accumulator value. Returns an observable sequence containing the accumulated values. **Example:** .. code:: fsharp let scannerAsync a x = async { return a + x } let xs = AsyncRx.ofSeq <| seq { 1..5 } |> AsyncRx.scanAsync 0 scannerAsync .. val:: groupBy :type: keyMapper: ('a -> 'g) -> source: IAsyncObservable<'a> -> IAsyncObservable> Groups the elements of an observable sequence according to a specified key mapper function. Returns a sequence of observable groups, each of which corresponds to a given key. **Example:** .. code:: fsharp let xs = AsyncRx.ofSeq [1; 2; 3; 4; 5; 6] |> AsyncRx.groupBy (fun x -> x % 2) |> AsyncRx.flatMap (fun x -> x) Combining ========= Functions for combining multiple async observables into one. .. val:: merge :type: IAsyncObservable<'a> -> IAsyncObservable<'a> -> IAsyncObservable<'a> Merges an observable sequence with another observable sequence. .. val:: mergeInner :type: IAsyncObservable\\> -> IAsyncObservable<'a> Merges an observable sequence of observable sequences into an observable sequence. .. val:: switchLatest :type: IAsyncObservable> -> IAsyncObservable<'a> Transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence. .. val:: concat :type: seq> -> IAsyncObservable<'a> Concatenates an observable sequence with another observable sequence. .. val:: startWith :type: seq<'a> -> IAsyncObservable<'a> -> IAsyncObservable<'a> Prepends a sequence of values to an observable sequence. Returns the source sequence prepended with the specified values. .. val:: combineLatest :type: IAsyncObservable<'b> -> IAsyncObservable<'a> -> IAsyncObservable<'a*'b> Merges the specified observable sequences into one observable sequence by combining elements of the sources into tuples. Returns an observable sequence containing the combined results. .. val:: withLatestFrom :type: IAsyncObservable<'b> -> IAsyncObservable<'a> -> IAsyncObservable<'a*'b> Merges the specified observable sequences into one observable sequence by combining the values into tuples only when the first observable sequence produces an element. Returns the combined observable sequence. .. val:: zipSeq :type: seq<'b> -> IAsyncObservable<'a> -> IAsyncObservable<'a*'b> Zip given sequence with source. Combines one and one item from each stream into one tuple. Time-shifting ============= Functions for time-shifting (``IAsyncObservable<'a> -> IAsyncObservable<'a>``) an async observable. .. val:: delay :type: int -> IAsyncObservable<'a> -> IAsyncObservable<'a> Time shifts the observable sequence by the given timeout. The relative time intervals between the values are preserved. .. val:: debounce :type: int -> IAsyncObservable<'a> -> IAsyncObservable<'a> Ignores values from an observable sequence which are followed by another value before the given timeout. .. val:: sample :type: msecs: int -> source: IAsyncObservable<'a> -> IAsyncObservable<'a> Samples the observable sequence at each interval. Leaving ======= Functions for leaving (``IAsyncObservable<'a> -> 'a``) the async observable. .. val:: toAsyncSeq :type: IAsyncObservable<'a> -> AsyncSeq<'a> *(Not available in Fable)*