Maurits Rijk Maurits Rijk - 2 months ago 24
Node.js Question

Idiomatic way in RxJS to calculate the maximum of a stream of values

Calculating the maximum of a fixed stream is straightforward, for example:

var source = Rx.Observable.from([1,3,5,7,9,2,4,6,8]).max();

However this only generates one value (9 in this case).

I would like to generate a sequence of maximum values so far, until the Observable produces a new maximum value. So the output would be something like [1, 3, 5, 7, 9].

My idea so far has been to merge the output stream with maximum values, with the input stream (using combineLatest) and compare the two values. However this seems to be more complicated to me than it should be.


You can use the scan function in that way :

function max(a,b) {return (a>b ? a : b}

var max$ = source$.scan(max,0);

Scan allows you to have a stream which follows a relation of the type On+1 = f(In+1, Sn) where In is the nth input of the source stream, On would be the nth output, and Sn would be a value accumulated over the sequence (similar to reduce operator for arrays).