Craig Russell Craig Russell - 1 month ago 9
Java Question

Combine a list of Observables and wait until all completed

TL;DR
How to convert

Task.whenAll(List<Task>)
into
RxJava
?

My existing code uses Bolts to build up a list of asynchronous tasks and waits until all of those tasks finish before performing other steps. Essentially, it builds up a
List<Task>
and returns a single
Task
which is marked as completed when all tasks in the list complete, as per the example on the Bolts site.

I'm looking to replace
Bolts
with
RxJava
and I'm assuming this method of building up a list of async tasks (size not known in advance) and wrapping them all into a single
Observable
is possible, but I don't know how.

I've tried looking at
merge
,
zip
,
concat
etc... but can't get to work on the
List<Observable>
that I'd be building up as they all seem geared to working on just two
Observables
at a time if I understand the docs correctly.

I'm trying to learn
RxJava
and am still very new to it so forgive me if this is an obvious question or explained in the docs somewhere; I have tried searching. Any help would be much appreciated.

Answer

It sounds like you're looking for the Zip operator.

There are a few different ways of using it, so let's look at an example. Say we have a few simple observables of different types:

Observable<Integer> obs1 = Observable.just(1);
Observable<String> obs2 = Observable.just("Blah");
Observable<Boolean> obs3 = Observable.just(true);

The simplest way to wait for them all is something like this:

Observable.zip(obs1, obs2, obs3, (Integer i, String s, Boolean b) -> i + " " + s + " " + b)
.subscribe(str -> System.out.println(str));

Note that in the zip function, the parameters have concrete types that correspond to the types of the observables being zipped.

Zipping a list of observables is also possible, either directly:

List<Observable<?>> obsList = Arrays.asList(obs1, obs2, obs3);

Observable.zip(obsList, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));

...or by wrapping the list into an Observable<Observable<?>>:

Observable<Observable<?>> obsObs = Observable.from(obsList);

Observable.zip(obsObs, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));

However, in both of these cases, the zip function can only accept a single Object[] parameter since the types of the observables in the list are not known in advance as well as their number. This means that that the zip function would have to check the number of parameters and cast them accordingly.

Regardless, all of the above examples will eventually print 1 Blah true

EDIT: When using Zip, make sure that the Observables being zipped all emit the same number of items. In the above examples all three observables emitted a single item. If we were to change them to something like this:

Observable<Integer> obs1 = Observable.from(new Integer[]{1,2,3}); //Emits three items
Observable<String> obs2 = Observable.from(new String[]{"Blah","Hello"}); //Emits two items
Observable<Boolean> obs3 = Observable.from(new Boolean[]{true,true}); //Emits two items

Then 1, Blah, True and 2, Hello, True would be the only items passed into the zip function(s). The item 3would never be zipped since the other observables have completed.

Comments