dafing dafing - 1 year ago 248
TypeScript Question

RxJS - Collect async operation results

I want to execute an async operation on every element of an array and collect its results in a dictionary. My current approach is:

let asyncOp = () => Rx.Observable.interval(300).take(1);
let dict = {};

Rx.Observable.from(['a', 'b'])
.mergeMap(el => asyncOp()
.map(asyncOpRes => dict[el] = asyncOpRes)
.do(state => console.log('dict state: ', dict))
.map(() => dict)
.subscribe(res => console.log('dict result: ', res));

<script src="https://npmcdn.com/@reactivex/rxjs@5.0.0-beta.7/dist/global/Rx.umd.js"></script>

Basically this works like I want but it seems to be an awkward usage of the RxJs operators. So I need help with the following:

  1. avoid dict mutation (tried using scan(), but don't know how to use it here. there's a mergeScan() method, but same here)

  2. usage of takeLast and take - should be possible to simplify?

I think I'm missing a RxJS operator which helps me to simplify this.

Answer Source

To "execute an async operation on every element of an array and collect its results in a dictionary" the code can be simplified significantly using the mergeMap and reduce functions:

import * as Rx from "rxjs/Rx";

const asyncOp = () => Rx.Observable.interval(300).take(1);

Rx.Observable.from(["a", "b"])

    // Perform the async operation on the values emitted from the
    // observable and map the emitted value and async result into
    // an object.

    .mergeMap((key) => asyncOp().map((result) => ({ key, result })))

    // Use reduce to build an object containing the emitted values
    // (the keys) and the async results.

    .reduce((acc, value) => { acc[value.key] = value.result; return acc; }, {})
    .subscribe((value) => { console.log(value); });
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download