Ben Smith Ben Smith - 6 months ago 308
Java Question

Paginate Observable results without recursion - RxJava

I've got a pretty standard API pagination problem which you can handle with some simple recursion. Here's a fabricated example:

public Observable<List<Result>> scan() {
return scanPage(Optional.empty(), ImmutableList.of());
}

private Observable<?> scanPage(Optional<KEY> startKey, List<Result> results) {
return this.scanner.scan(startKey, LIMIT)
.flatMap(page -> {
if (!page.getLastKey().isPresent()) {
return Observable.just(results);
}
return scanPage(page.getLastKey(), ImmutableList.<Result>builder()
.addAll(results)
.addAll(page.getResults())
.build()
);
});
}


But this can obviously create a massive callstack. How can I do this imperatively but maintain the Observable stream?

Here's an imperative blocking example:

public List<Result> scan() {
Optional<String> startKey = Optional.empty();
final ImmutableList.Builder<Result> results = ImmutableList.builder();

do {
final Page page = this.scanner.scan(startKey);
startKey = page.getLastKey();
results.addAll(page.getResults());
} while (startKey.isPresent());

return results.build();
}

Answer

It's not the most elegant of solutions but you can use subjects and side-effects. See the toy example below

import rx.Observable;
import rx.Subscriber;
import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;
import rx.subjects.*;

public class Pagination {
    static HashMap<String,ArrayList<String>> pages = new HashMap<String,ArrayList<String>>();

    public static void main(String[] args) throws InterruptedException {
        pages.put("default", new ArrayList<String>());
        pages.put("2", new ArrayList<String>());
        pages.put("3", new ArrayList<String>());
        pages.put("4", new ArrayList<String>());

        pages.get("default").add("2");
        pages.get("default").add("Maths");
        pages.get("default").add("Chemistry");  

        pages.get("2").add("3");
        pages.get("2").add("Physics");   
        pages.get("2").add("Biology"); 

        pages.get("3").add("4");
        pages.get("3").add("Art");   

        pages.get("4").add("");
        pages.get("4").add("Geography"); 



        Observable<List<String>> ret = Observable.defer(() -> 
        { 
            System.out.println("Building Observable");
            ReplaySubject<String> pagecontrol = ReplaySubject.<String>create(1);
            Observable<List<String>> ret2 = pagecontrol.asObservable().concatMap(aKey -> 
            {
                if (!aKey.equals("")) {
                    return Observable.just(pages.get(aKey)).doOnNext(page -> pagecontrol.onNext(page.get(0)));
                } else {
                    return Observable.<List<String>>empty().doOnCompleted(()->pagecontrol.onCompleted());
                }
            });
            pagecontrol.onNext("default");
            return ret2;
        });
        // Use this if you want to ensure work isn't done again
        ret = ret.cache();
        ret.subscribe(l -> System.out.println("Sub 1 : " + l));
        ret.subscribe(l -> System.out.println("Sub 2 : " + l));
        Thread.sleep(2000L);
    }
}

Edited with improvements.

Comments