Pawan Mishra Pawan Mishra -4 years ago 161
Java Question

RxJava and parallel execution of observer code

I am having the following code using RxJava Observable api :

Observable<Info> observable = fileProcessor.processFileObservable(processedFile.getAbsolutePath());
.subscribe(recordInfo -> {"Running stage2 on thread with id : " + Thread.currentThread().getId());
for(Info info : recordInfo) {
// some I/O operation logic
exception -> {
() -> {

My expectation was that the observation code i.e. code inside the subscribe() method will be executed in parallel after I have specified the computation scheduler. Instead the code is still being executed sequentially on single thread. How can make the code run in parallel using RxJava api.

Answer Source

RxJava is often misunderstood when it comes to the asynchronous/multithreaded aspects of it. The coding of multithreaded operations is simple, but understanding the abstraction is another thing.

A common question about RxJava is how to achieve parallelization, or emitting multiple items concurrently from an Observable. Of course, this definition breaks the Observable Contract which states that onNext() must be called sequentially and never concurrently by more than one thread at a time.

To achieve parallelism you need multiple Observables.

This runs in a single thread:

Observable<Integer> vals = Observable.range(1,10);

          .map(i -> intenseCalculation(i))
          .subscribe(val -> System.out.println("Subscriber received "
                  + val + " on "
                  + Thread.currentThread().getName()));

This runs in multiple threads:

Observable<Integer> vals = Observable.range(1,10);

vals.flatMap(val -> Observable.just(val)
            .map(i -> intenseCalculation(i))
).subscribe(val -> System.out.println(val));

Code and text comes from this blog post.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download