IKIKN2 IKIKN2 -4 years ago 59
Android Question

Making a call with RX and then making multiple parallel calls from the result of the first call

I have to make a call to an API that returns a list of items. For each item of this list, I have to make a call to another API (if the list returns 8 items, I will have to make 8 parallel calls).

I finally have to return a list that I will create with the results of each of these 8 parallel calls.

How can I do that with RxJava ? I think that I have to use a flatMap to transform the result of the first call to a list of Observables, and then I have to use the zip operator to make the parallel calls, but I'm not sure.

Please note that I'm using RxJava2, and without lambdas expressions.

Thanks !

Answer Source

you can do it like this for example, defer() lets you fetch the data only when subscribing and then creating Observable that emits all items (one by one) in the list of items.
then flatMap() will create Observable that will fetch the data for each item, and you will have now Observable that emit Data objects. in order to collect it, you can use toList() that will emit single object ( a List) that will contain all the Data fetched by each Observable.

Note, in order to do it in parallel it is important that the fetchDataFromItem() will subscribe on Schedulers.io(), even it the all stream is subscribed on io.

Observable.defer(new Callable<ObservableSource<Item>>() {
        @Override
        public ObservableSource<Item> call() throws Exception {
            List<Item> items = getItems();
            return Observable.fromIterable(items);
        }
    })
            .flatMap(new Function<Item, ObservableSource<Data>>() {
                @Override
                public ObservableSource<Data> apply(@NonNull Item item) throws Exception {
                    return fetchDataFromItem(item);
                }
            })
            .toList()
            .subscribe(new Consumer<List<Data>>() {
                @Override
                public void accept(@NonNull List<Data> objects) throws Exception {
                //do something with the list of all fetched data
                }
            });

UPDATE:

in case that the items fetching is already Observable, the defer() can be replaced with flatMapIterable() that takes single List of items and transform it to Observable of multiple items:

getItemsObservable()
        .flatMapIterable(new Function<List<Item>, Iterable<Item>>() {
            @Override
            public Iterable<Item> apply(@NonNull List<Item> items) throws Exception {
                return items;
            }
        })
        .flatMap(new Function<Item, ObservableSource<Data>>() {
            @Override
            public ObservableSource<Data> apply(@NonNull Item item) throws Exception {
                return fetchDataFromItem(item);
            }
        })
        .toList()
        .subscribe(new Consumer<List<Data>>() {
            @Override
            public void accept(@NonNull List<Data> objects) throws Exception {
                //do something with the list of all fetched data
            }
        });
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download