user1202032 user1202032 - 5 days ago 5
TypeScript Question

Typescript awaiting multiple subscriptions

I have an Observable containing a list of ID's. For each ID, i wish to get the object which the ID represents. Doing this requires me to get an observable for each object. How do I make sure all objects have been received before continuing?

This is a many-to-many relation in a database.

getExercises(programKey: string): Observable<Array<Exercise>> {
let source = Observable.create(observer => {
// ... stuff here ...
programExercises.subscribe(programExercisesSnapshot => {
let exercises = Array<Exercise>();
programExercisesSnapshot.forEach(programExercise => {
let exercise = this.getExercise(programExercise.ExerciseKey); // Returns Observable<Exercise>
exercise.subscribe(exerciseSnapshot => exercises.push(exerciseSnapshot)); // TODO: Need to await all these subscriptions before calling observer.next()
});
observer.next(exercises);
});

return () => { }; // Dispose
});

return source;
}


Thanks in advance!

Answer

Well, apart from the fact that returning Array as a result of Observable looks a bit odd, this is how I'd do it:

getExercises(programKey: string): Observable<Array<Exercise>> {
    // ... stuff here ...
    return programExercises
        // assuming that programExercisesSnapshot is an array or can be easily converted to it
        .flatMap(programExercisesSnapshot => Observable
            .fromArray(programExercisesSnapshot)
            .flatMap(programExercise => this.getExercise(programExercise.ExerciseKey))
            .bufferCount(programExercisesSnapshot.length));

Now let's see how this is supposed to work. Let's start with inner thing.

  1. we generate observable from array programExercisesSnapshot, which then emits its elements one by one;
  2. we catch those elements and replace them in the flow with the results of observables returned by this.getExercise(programExercise.ExerciseKey) calls using flatMap();
  3. bufferCount() gathers programExercisesSnapshot.length elements into a single array and emits it as a result.

So, this whole pipeline emits arrays of results of this.getExercise() calls.

Now, the outer thing does the following:

  1. it takes batches emitted by programExercises;
  2. replaces them with the results (e.g. arrays) emitted by previously described observable;
  3. and emits those results as its own.

Profit! :)

One more thing you missed in your original solution is a cleanup. When you do programExercises.subscribe() you also need to unsubscribe from it manually as well. Doing as I suggested eliminates the need of it - rxjs will take care about it for you.

Also, as I told in the beginning, returning an Array in the observable looks a little bit odd. I hope you have good reason for doing that. :) Otherwise you may want to consider converting that into observable emitting elements one by one as well.

Comments