thebobblob thebobblob - 17 days ago 7
AngularJS Question

How to use RxJS observables in sequential order?

Here's the deal: I have a HTTP get request that returns a JSON list of objects. Using RxJS I subscribe to receive the data of that list. Now, for each of the objects in that list I want to perform another HTTP request and then place the results of that request in an Array.

So far I've been able to do this but I can't seem to figure out how to maintain the order of the initial list with data. This probably has to do with the fact that the whole Observable mechanism is asynchronous.

Here's my code:

ngOnInit(): void {
this.shiftInformationService.getShifts("2016-11-03T06:00:00Z", "2016-11-06T06:00:00Z")
.subscribe(shifts => {
shifts.forEach(shift => {
this.addDataToAreaChart(shift.startDateTime, shift.endDateTime, shift.description);
});
});

}

addDataToAreaChart(startDate: string, endDate: string, description: string) {
this.machineStatisticsService
.getCumulativeMachineStateDurations(startDate, endDate)
.subscribe(s => {
this.areaChartData = [];
this.areaChartData.push(new AreaChartData(startDate, endDate, description, s));
});
}


What I want is to maintain the order of calling from the
shifts.forEach
loop when pushing the data in the
areaChartData
array.

Any ideas? Help would be appreciated!

UPDATE: SOLVED!

Final code:

ngOnInit(): void {
var startDate = new Date();
startDate.setDate(startDate.getDate() - 3);

this.shiftInformationService.getShifts(DateUtil.formatDate(startDate), DateUtil.formatDate(new Date()))
.subscribe(shifts => {
Observable.from(shifts)
.concatMap((shift) => {
return this.machineStatisticsService
.getCumulativeMachineStateDurations(shift.startDateTime, shift.endDateTime)
.map((result) => {
return {
"addDataToAreaChartValue": result,
"shift": shift
}
});
})
.subscribe(s => {
this.areaChartData = [];
this.areaChartData.push(
new AreaChartData(
s.shift.startDateTime,
s.shift.endDateTime,
s.shift.description + ' ' + s.shift.startDateTime.slice(5, 10),
s.addDataToAreaChartValue
)
);
});
});
}


Thanks to Michael!

Answer

Use concatMap to process in sequence.

Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next.

Use map to append/transform value in observable.

Applies a given project function to each value emitted by the source Observable, and emits the resulting values as an Observable.

So, you need to do this

ngOnInit(): void {
    this.shiftInformationService.getShifts("2016-11-03T06:00:00Z", "2016-11-06T06:00:00Z")
        .subscribe(shifts => {
            Rx.Observable.from(shifts) // create observable of each value in array
                .concatMap((shift) => { // process in sequence
                    return this.addDataToAreaChart(
                        shift.startDateTime, 
                        shift.endDateTime, 
                        shift.description
                    ).map((result) => {
                        return {
                           "addDataToAreaChartValue" : result, // addDataToAreaChart result
                           "shift": shift // append shift object here, so we can access it on subscribe
                        }
                    });
                })
                .subscribe(s => {
                    //this.areaChartData = []; // why??
                    this.areaChartData.push(
                        new AreaChartData(
                            s.shift.startDate, 
                            s.shift.endDate, 
                            s.shift.description, 
                            s.addDataToAreaChartValue
                        )
                    );
                });
        });
}

addDataToAreaChart(startDate: string, endDate: string, description: string) {
    return this.machineStatisticsService
        getCumulativeMachineStateDurations(startDate, endDate);
}
Comments