I am new to Rxjs. I want to follow best practices if possible.
I am trying to perform three distinct functions on the same data that is returned in an observable. Following the 'streams of data' concept, I keep on thinking I need to split this Observable into three streams and carry on.
Here is my code, so I can stop talking abstractly:
// NotEmptyResponse splits the stream in 2 to account based on whether I get an empty observable back.
let base_subscription = RxNode.fromStream(siteStream).partition(NotEmptyResponse);
// Success Stream to perform further actions upon.
let successStream = base_subscription;
// The Empty stream for error reporting
let failureStream = base_subscription;
//Code works up until this point. I don't know how to split to 3 different streams.
.map(grabData)// Async action that returns data
/*** Perform 3 separate actions upon data that .map(grabData) returned **/
partition() operator internally just calls
filter() operator twice. First to create an Observable from values matching the
predicate and then for values not matching the
So you can do the exact same thing with filter() operator:
let obs1 = base_subscription.filter(val => predicate1); let obs2 = base_subscription.filter(val => predicate2); let obs3 = base_subscription.filter(val => predicate3);
Now you have three Observables, each of them emitting only some specific values. Then you can carry on with your existing code:
obs2.filter(isSite) .map(grabData) .subscribe();
Just be aware that calling
subscribe() triggers the generating values from the source Observable. This doesn't have to be always like this depending on what Observable you use. See “Hot” and “Cold” Observables in the documentation. Operator
connect() might be useful for you depending on your usecase.