Pankaj Kapare Pankaj Kapare - 3 months ago 33
Javascript Question

RxJS adding new item to array stream not getting published to subscriber

I am learning RxJS. I have question regarding below code snippet.

var arr = [1,2,3,4,5];
var arraysource = Observable.from(arr);

arr.push(6);
var subscription = arraysource.subscribe(
x => console.log('onNext: %s', x),
e => console.log('onError: %s', e),
() => console.log('onCompleted'));

arr.push(7);


When I run above code I get following output.

onNext: 1
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onNext: 6
onCompleted


My question is why seventh element is not getting published which is added after subscription? Is it because input stream is cold stream and its reading items synchronously? So items added post onComplete fires will never reach to observer? Can someone please throw some light on this behavior?

Answer

You could create your own Observable to get the functionality you're looking for

Rx 5 Beta (change next and complete to onNext onCompleted for Rx 4)

var source = Rx.Observable.create(function (observer) {
    [1,2,3,4,5].forEach(item => observer.next(item));

    observer.next(6);
    // observer.complete() // <-- remove comment to allow observable to complete
    // Any cleanup logic might go here
    return function () {
        console.log('disposed');
    };
});


var subscription = source.subscribe(
    function (x) { console.log('onNext: %s', x); },
    function (e) { console.log('onError: %s', e); },
    function () { console.log('onCompleted'); }
);

example http://jsbin.com/datuqoniyo/edit?js,console

https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/creating.md#creating-a-sequence-from-scratch

http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#static-method-create

Comments