mm_857 mm_857 - 1 month ago 10
Java Question

Can I parallelise an operation driven by a flatMap?

Say I have a method like the code below, in which a List is flatMapped to individual strings, each of which has some expensive operation applied to them. Is there any way to parallelise the expensive operations, in the same way that I'd use parallelStream() in Java 8?

final List<String> names = new ArrayList<String>() {{
add("Ringo");
add("John");
add("Paul");
add("George");
}};

Observable.just(names).subscribeOn(Schedulers.io())
.flatMap(new Func1<List<String>, Observable<String>>() {
@Override
public Observable<String> call(final List<String> names) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (String name : names) {
subscriber.onNext(name);
}
}
});
}
})
.map(new Func1<String, String>() {
@Override
public String call(String s) {
//Simulate expensive operation
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s.toUpperCase();
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {
Log.v("RXExample", s + " on " + Thread.currentThread().getName());
}
});


For completion, applying the change recommended in the answer looks like the following and works nicely!

final List<String> names = new ArrayList<String>() {{
add("Ringo");
add("John");
add("Paul");
add("George");
}};

Observable.just(names).subscribeOn(Schedulers.io())
.flatMap(new Func1<List<String>, Observable<String>>() {
@Override
public Observable<String> call(final List<String> names) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(final Subscriber<? super String> subscriber) {
for (final String name : names) {
Observable
.just(name)
.subscribeOn(Schedulers.from(Executors.newFixedThreadPool(5)))
.map(new Func1<String, String>() {
@Override
public String call(String s) {
//Simulate expensive operation
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s.toUpperCase();
}
}).subscribe(new Observer<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {
subscriber.onNext(name);
}
});
}
}
});
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {
Log.v("RXExample", s + " on " + Thread.currentThread().getName());
}
});

Answer

you can parallelise work with flatMap as in the following example. I am using RxJava2 for testing.

For further explanation please read the flatMap usage from here: http://tomstechnicalblog.blogspot.de/2015/11/rxjava-achieving-parallelization.html

@Test
public void name() throws Exception {
    final List<String> names = new ArrayList<String>() {{
        add("Ringo");
        add("John");
        add("Paul");
        add("George");
    }};

    Observable<String> stringObservable = Observable.fromIterable(names)
            .flatMap(s -> {
                return longWork(s).doOnNext(s1 -> {
                    printCurrentThread(s1);
                }).subscribeOn(Schedulers.newThread());
            });

    TestObserver<String> test = stringObservable.test();

    test.awaitDone(2_000, TimeUnit.MILLISECONDS).assertValueCount(4);
}

private Observable<String> longWork(String s) throws InterruptedException {
    return Observable.fromCallable(() -> {
        Thread.sleep(1_000);

        return s;
    });
}

private void printCurrentThread(String additional) {
    System.out.println(additional + "_" + Thread.currentThread());
}