calbear47 calbear47 - 29 days ago 5
Node.js Question

Map three different functions to Observable in Node.js

I am new to Rxjs. I want to follow best practices if possible.

I am trying to perform three distinct functions on the same data that is returned in an observable. Following the 'streams of data' concept, I keep on thinking I need to split this Observable into three streams and carry on.

Here is my code, so I can stop talking abstractly:

// NotEmptyResponse splits the stream in 2 to account based on whether I get an empty observable back.
let base_subscription = RxNode.fromStream(siteStream).partition(NotEmptyResponse);

// Success Stream to perform further actions upon.
let successStream = base_subscription[0];

// The Empty stream for error reporting
let failureStream = base_subscription[1];

//Code works up until this point. I don't know how to split to 3 different streams.
.map(grabData)// Async action that returns data
/*** Perform 3 separate actions upon data that .map(grabData) returned **/

How can I split this data stream into three, and map each instance of the data to a different function?


In fact partition() operator internally just calls filter() operator twice. First to create an Observable from values matching the predicate and then for values not matching the predicate.

So you can do the exact same thing with filter() operator:

let obs1 = base_subscription.filter(val => predicate1);
let obs2 = base_subscription.filter(val => predicate2);
let obs3 = base_subscription.filter(val => predicate3);

Now you have three Observables, each of them emitting only some specific values. Then you can carry on with your existing code:


Just be aware that calling subscribe() triggers the generating values from the source Observable. This doesn't have to be always like this depending on what Observable you use. See “Hot” and “Cold” Observables in the documentation. Operator connect() might be useful for you depending on your usecase.