sfrehse sfrehse - 1 month ago 6
Node.js Question

RxJS: Working on groupBy and Observable.fromEvents in NodeJS

I'm very impressed about RxJS and actually start working on that. However, my following nodejs code does not work as expected at least for me.

let events = new EventEmitter();
let source = Rx.Observable.fromEvent( events, 'data' );

source
.groupBy( event => event.type )
.flatMap( group => group.reduce( ( acc, cur ) => _.merge( acc, cur ), [] ) )
.subscribe( ( data ) => {
console.log( data );
} );


events.emit( 'data', { 'type': 1, msg: 'Test 1' } );
events.emit( 'data', { 'type': 1, msg: 'Test 2' } );
events.emit( 'data', { 'type': 2, msg: 'Test 3' } );


I expect that
subscribe
produces some output

Answer

It's a slightly changed version I've created in a JSBin http://jsbin.com/cufana/edit?js,console.

The problem I'm seeing in your code is that you have an observable that does not complete. If you are performing a groupBy, the results that are being kept internally (being the grouped results) will not be pushed through as long as the source observable does not complete.

let events = new Rx.Subject();

events
    .groupBy( event => event.type)
    .flatMap(group => group.reduce((acc, curr) => [...acc, curr], []))
    .subscribe( ( data ) => {
        console.log( data );
    } );


events.next({ 'type': 1, msg: 'Test 1' } );
events.next({ 'type': 1, msg: 'Test 2' } );
events.next({ 'type': 2, msg: 'Test 3' } );
events.complete();

Here you can see I've changed the eventemitter to a subject to get a working jsbin without angular2 dependency. I'm acually completing the subject so my source observable from the groupBy get's completed. This will push the results through.

The rest of the code was pretty correct.

If the EventEmitter is indeed from Angular2, you will have problems I guess with completing this one. Can this be done from the child component?