Mamadou Mamadou - 1 month ago 20
Android Question

RxJava flatmap chaining requests

i am using Retrofit with RxJAva for an app that gets Rss Feeds, but the rss doesn't contain all the informations so i use jsoup to parse every item link, to retrieve the image and the article's description. now i am using it this way:

public Observable<Rss> getDumpData() {
return newsAppService.getDumpData()
.flatMap(rss -> Observable.from(rss.channel.items)
.observeOn(Schedulers.io())
.flatMap(Checked.f1(item -> Observable.just(Jsoup.connect(item.link).get())
.observeOn(Schedulers.io())
.map(document -> document.select("div[itemprop=image] > img").first())
.doOnNext(element -> item.image = element.attr("src"))
)))
.defaultIfEmpty(rss)
.ignoreElements()
.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread());
}


and i am getting an error on this line:
defaultIfEmpty(rss)

it doesn't recognize rss of the flatmap. and when i move the
defaultIfEmpty(rss)
in flatmap brackets i have another error saying that the return type must be changed to Element. is their any solution ?

Answer

You can't mix internal parameter of one RxJava parameter (flatMap lambda parameter) with another operator parameter (defaultIfEmpty).

First of all, create a helper function to keep main reactive stream cleaner:

private Observable<List<Item>> getDetails(List<Item> items) {
    return Observable.from(items)
               .observeOn(Schedulers.io())
               .flatMap(Checked.f1(item ->
                   Observable.zip(
                       Observable.just(item),
                            Observable.just(Jsoup.connect(item.link).get())
                           .observeOn(Schedulers.io())
                           .map(document -> document.select("div[itemprop=image] > img").first()),
                           (itemInner, element) -> {
                                itemInner.image = element.attr("src");
                                return itemInner;
                           }
                   )
               ))
               .toList();
}

Then reformat main function:

newsAppService.getDumpData()
    .flatMap(rss ->
        Observable.zip(
            Observable.<Rss>just(rss),
            getDetails(rss.channel.items),
            (rssInner, items) -> {
                rssInner.channel.items = items;
                return rss;
            }).onErrorResumeNext((throwable -> Observable.just(rss))
        )
    )
    .observeOn(Schedulers.io())
    .subscribeOn(AndroidSchedulers.mainThread());

Hope I got your aim properly. It may not work, as I am unable to test it, however I hope you get the idea. The reason I used .zip functions it that you can't loose the reference to the currently parsed item or rss

Comments