view raw
Ivan Kleshnin Ivan Kleshnin - 9 months ago 60
Javascript Question

How onComplete actually works in RxJS

Observables complete naturally if they are constructed from finite data.

import {Observable, Subject} from "rx";

let stream0$ = Observable.of("1", "2", "3");
let stream1$ = stream0$.map(x => x);

(val) => { console.log("onNext", val) },
(err) => { console.log("onError", err) },
() => { console.log("onCompleted") }

// onNext 1
// onNext 2
// onNext 3
// onCompleted

Or don't if not. But what about observables subscribed on subjects? For example:

import {Observable, Subject} from "rx";

let subj$ = new Subject();
let stream1$ = subj$.map(x => x);

(val) => { console.log("onNext", val) },
(err) => { console.log("onError", err) },
() => { console.log("onCompleted") }


// onNext foo

"onCompleted" is not logged though source is ended. Can we pass this "end" event to
somehow. I've found no information about this important stuff in docs. It would be great to see a diagram like here Hot and Cold observables : are there 'hot' and 'cold' operators? to nail that event flow.


With a subject, you are completely in control. Rx.Subject implements the observer interface and it is that observer interface that you use when you call onNext.

A subject

assume that all serialization and grammatical correctness are handled by the caller of the subject.

That means among other things that it is up to you to signal completion and error. To signal completion, use onCompleted. FYI, here is the aforementioned grammar :

This grammar allows observable sequences to send any amount (0 or more) of onNext messages to the subscribed observer instance, optionally followed by a single success (onCompleted) or failure (onError) message.

The single message indicating that an observable sequence has finished ensures that consumers of the observable sequence can deterministically establish that it is safe to perform cleanup operations.

A single failure further ensures that abort semantics can be maintained for operators that work on multiple observable sequences.

NOTE : for RxJS v5, the observer interface has changed, cf. new interface