Niklas Fasching Niklas Fasching - 3 months ago 11
Javascript Question

RxJS #zip groups created using #groupBy

I need to zip grouped observables (to form the cartesian product of related groups, but that's not relevant for the question).

When running the below code, only the child observable groups actually emit values inside the #zip - Why is that?

https://jsbin.com/coqeqaxoci/edit?js,console

var parent = Rx.Observable.from([1,2,3]).publish();
var child = parent.map(x => x).publish();
var groupedParent = parent.groupBy(x => x);
var groupedChild = child.groupBy(x => x);

Rx.Observable.zip([groupedChild, groupedParent])
.map(groups => {
groups[0].subscribe(x => console.log('zipped child ' + x)); // -> emitting
groups[1].subscribe(x => console.log('zipped parent ' + x)); // -> not emitting
})
.subscribe();

groupedChild.subscribe(group => {
group.subscribe(value => console.log('child ' + value)); // -> emitting
});

groupedParent.subscribe(group => {
group.subscribe(value => console.log('parent ' + value)); // -> emitting
});

child.connect();
parent.connect();


Edit:
As explained in the answer by user3743222, the groups emitted by groupBy are hot and the subscription to the parent group (groups[1]) happens after the first values have already been emitted. This happens as #zip waits for both groupedChild and groupedParent to emit, the latter of which emits sooner (meaning its groups emit values before the #zip function is run).

Answer

I modified your code as follows:

var countChild = 0, countParent = 0;
function emits ( who ) {
  return function ( x ) {console.log(who + " emits : " + x);};
}
function checkCount ( who ) {
  return function ( ) {
    if (who === "parent") {
      countParent++;
    }
    else {
      countChild++;
    }
    console.log("Check : Parent groups = " + countParent + ", Child groups = " + countChild );
  };
}
function check ( who, where ) {
  return function ( x ) {
    console.log("Check : " + who + " : " + where + " :" + x);
  };
}
function completed ( who ) {
  return function () { console.log(who + " completed!");};
}
function zipped ( who ) {
  return function ( x ) { console.log('zipped ' + who + ' ' + x); };
}
function plus1 ( x ) {
  return x + 1;
}
function err () {
  console.log('error');
}

var parent = Rx.Observable.from([1, 2, 3, 4, 5, 6])
    .do(emits("parent"))
    .publish();
var child = parent
    .map(function ( x ) {return x;})
    .do(emits("child"))
//    .publish();

var groupedParent = parent
    .groupBy(function ( x ) { return x % 2;}, function ( x ) {return "P" + x;})
    .do(checkCount("parent"))
    .share();

var groupedChild = child
    .groupBy(function ( x ) { return x % 3;}, function (x) {return "C" + x;})
    .do(checkCount("child"))
    .share();

Rx.Observable.zip([groupedChild, groupedParent])
//    .do(function ( x ) { console.log("zip args : " + x);})
    .subscribe(function ( groups ) {
                 groups[0]
                     .do(function ( x ) { console.log("Child group observable emits : " + x);})
                     .subscribe(zipped('child'), err, completed('Child Group Observable'));
                 groups[1]
                     .do(function ( x ) { console.log("Parent group observable emits : " + x);})
                     .subscribe(zipped('parent'), err, completed('Parent Group Observable'));
               }, err, completed('zip'));

//child.connect();
parent.connect();

Here is the output :

"parent emits : 1"
"child emits : 1"
"Check : Parent groups = 0, Child groups = 1"
"Check : Parent groups = 1, Child groups = 1"
"Parent group observable emits : P1"
"zipped parent P1"
"parent emits : 2"
"child emits : 2"
"Check : Parent groups = 1, Child groups = 2"
"Check : Parent groups = 2, Child groups = 2"
"Parent group observable emits : P2"
"zipped parent P2"
"parent emits : 3"
"child emits : 3"
"Check : Parent groups = 2, Child groups = 3"
"Parent group observable emits : P3"
"zipped parent P3"
"parent emits : 4"
"child emits : 4"
"Child group observable emits : C4"
"zipped child C4"
"Parent group observable emits : P4"
"zipped parent P4"
"parent emits : 5"
"child emits : 5"
"Child group observable emits : C5"
"zipped child C5"
"Parent group observable emits : P5"
"zipped parent P5"
"parent emits : 6"
"child emits : 6"
"Parent group observable emits : P6"
"zipped parent P6"
"Child Group Observable completed!"
"Child Group Observable completed!"
"Parent Group Observable completed!"
"Parent Group Observable completed!"
"zip completed!"

There are two points to make here:

  1. Behaviour of zip and group by vs. moment of subscription

    • groupBy creates observables as expected, both in parent and child

    With those values, you can check in the log that Child creates three groups, Parent creates two

    • Zip will wait to have one value in each of the sources you pass as parameters. In your case, that means that you will subscribe to the child and parent grouped-by observables when they will both have been issued. In the log, you will see "Parent group observable emits : P1" only after matching numbers on "Check : Parent groups = 1, Child groups = 1".

    • You then subscribe to both grouped-by observables, and logs whatever comes out of there. The problem here is that the parent grouped-by observable has a value to pass on, BUT the child 'group-by' observable was created before and already passed on its value, so when you subscribe after the fact, you cannot see that value - but you will see the next ones.

    • So, values in [1-3] will generate 3 new child grouped-by observables, and you will not see any of those, as you subscribe too late. But you will see values in [4-6]. You can check in the log : "zipped child C4" etc.

    • You will see all values in the parent grouped-by observables, because you subscribe to them immediately after their creation.

  2. connect and publish

    • I don't have a completely clear understand of connect and publish but as your child has the parent as a source, you do not need to delay connection to it. If you connect to the parent, the child will automatically start to emit its values. Hence my modification to your code.

    • That should answer your immediate question, but not your original goal of cartesian product. Maybe you should formulate that as a question instead and see what answers people can come with.

Comments