Aaronius Aaronius - 2 months ago 14
Javascript Question

Rx BehaviorSubject + scan pushing prior event to new subscriber?

I want to have an stream to which I can push reducer functions. Each time a reducer function is pushed, a state object should be passed to the reducer, the reducer should return a modified state value, and the updated state should be pushed to subscribers. I'm hoping my code can explain:

import Rx from 'rx';
import { Map } from 'immutable';

let initialState = Map({ counter: 0 });

export let upstream = new Rx.BehaviorSubject(Rx.helpers.identity);
export let downstream = upstream.scan((state, reducer) => {
return reducer(state);
}, initialState);

let increment = state => {
return state.update('counter', counter => counter + 1);
};

upstream.onNext(increment);

downstream.subscribe(state => {
console.log('subscriptionA', state.get('counter'));
});

upstream.onNext(increment);

setTimeout(() => {
downstream.subscribe(state => {
console.log('subscriptionB', state.get('counter'));
});
}, 3000);


Here's the output I see:

subscriptionA 1
subscriptionA 2
subscriptionB 1


while I was hoping to see:

subscriptionA 1
subscriptionA 2
subscriptionB 2


Obviously I'm missing something fundamental here. It seems the
BehaviorSubject
is supposed to retain the latest value for new subscribers which would make me think that when
subscriptionB
subscribes to
downstream
that it would get the latest reduced value, but it looks like having the
.scan
in the middle fouls things up...or something.

What's going on here and how to I accomplish what I'm trying to accomplish? Thanks!

Answer

Can you try to see if everything is conform to your expectations if you replace

export let downstream = upstream.scan((state, reducer) => {
  return reducer(state);
}, initialState);

by

export let downstream = upstream.scan((state, reducer) => {
  return reducer(state);
}, initialState).shareReplay(1);

jsfiddle here : http://jsfiddle.net/cqaumutp/

If so, you are another victim of the hot vs. cold nature of Rx.Observable, or maybe more accurately the lazy instantiation of observables.

In short (not so short), what happens everytime you do a subscribe, is that a chain of observables is created by going upstream the chain of operators. Each operator subscribes to its source, and returns another observable up to the starting source. In your case, when you subscribe to scan, scan subscribes to upstream which is the last one. upstream being a subject, on subscription it just registers the subscriber. Other sources would do other things (like register a listener on a DOM node, or a socket, or whatever).

The point here is that every time you subscribe to the scan, you start anew, i.e. with the initialState. If you want to use the values from the first subscription to the scan, you have to use the share operator. On the first subscription to the share, it will pass your subscription request on to the scan. On the second and subsequent ones, it will not, it will register it, and forward to the associated observer all the values coming from the scan firstly subscribed to.

Comments