Javascript Question

Rx (JS): how to update subscriptions in flatMap stream

I have a state$ stream that constains messages$s which is array of messages$ stream. State$ is updated and new messages$ appear.

I want subscriber to handle messages from all messages$ in one sinle stream an I want this this stream contain only correct events.

I try to flatMap merged messages$ every time, but got the problem that old messages$s (which where in previous states$ values) are subsribed multiple timed.

How do I solve this?

let allMessages$ = state$.flatMap(s => {
return Observable.merge(s.messages$s)
console.log('message', x)
// message from single message$ appear multiple times

The problem is that after state$ is updated (with items pushed) old one became to be subscribed multiple times.

state$ --s(1)---------s(2)----
message$s[0]. --m1----m2-----------m4--
message$s[1] ---------------m3--------
allMessages$ --m1----m2-----m3----m4
m1 m4

s(1) - when state has 1 message$, s(2) when second message$ is added
So allMessages$ fire with messages from item1.

What i want is:

state$ --s(1)---------s(2)-----
message$s[0] --m1----m2-----------m4--
message$s[1] ---------------m3--------
allMessages$ --m1----m2-----m3----m4

This fildle shows the situation simplified:


Based on your simplified situation, these are the subscriptions sequences (you can review the answer here for explanation of hot vs. cold observables, and understanding of subscription flows):

  • emission of state1
    • subscription to typing$
  • emission of state2
    • subscription to typing$
    • subscription to typing2$

Because you use flatMap, you have three subscriptions at the same time. If you use flatMapLatest here is what happens :

  • emission of state1
    • subscription to typing$
  • emission of state2
    • 'unsubscription' (is that even english language) from previous stream emitted in flatMapLatest i.e. Rx.Observable.merge(state.items) i.e. typing$
    • subscription to typing$
    • subscription to typing2$

So try replacing flatMap by flatMapLatest and let me know if that solves the problem.

Another way to solve this could also to work with a stream of state changes instead of with the whole state (kind of what redux does for react).