Pankaj Kapare Pankaj Kapare - 5 months ago 42
Javascript Question

RxJS adding new item to array stream not getting published to subscriber

I am learning RxJS. I have question regarding below code snippet.

var arr = [1,2,3,4,5];
var arraysource = Observable.from(arr);

var subscription = arraysource.subscribe(
x => console.log('onNext: %s', x),
e => console.log('onError: %s', e),
() => console.log('onCompleted'));


When I run above code I get following output.

onNext: 1
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onNext: 6

My question is why seventh element is not getting published which is added after subscription? Is it because input stream is cold stream and its reading items synchronously? So items added post onComplete fires will never reach to observer? Can someone please throw some light on this behavior?


You could create your own Observable to get the functionality you're looking for

Rx 5 Beta (change next and complete to onNext onCompleted for Rx 4)

var source = Rx.Observable.create(function (observer) {
    [1,2,3,4,5].forEach(item =>;;
    // observer.complete() // <-- remove comment to allow observable to complete
    // Any cleanup logic might go here
    return function () {

var subscription = source.subscribe(
    function (x) { console.log('onNext: %s', x); },
    function (e) { console.log('onError: %s', e); },
    function () { console.log('onCompleted'); }