sdgfsdh sdgfsdh -4 years ago 61
Java Question

How to create Observable out of Runnable?

Sometimes I want to trigger a

Runnable
as part of my
Observable
sequence, but the
Runnable
does not report progress.

I have written a simple factory for wrapping a
Runnable
object into an
Observable
:

public static <T> Observable<T> fromRunnable(final Runnable action) {
if (action == null) {
throw new NullPointerException("action");
}
return Observable.fromPublisher(subscriber -> {
try {
action.run();
subscriber.onComplete();
} catch (final Throwable throwable) {
subscriber.onError(throwable);
}
});
}


Usage:

Observable.concat(
someTask,
MoreObservables.fromRunnable(() -> {
System.out.println("Done. ");
}));


But does RxJava 2 provide this functionality already?

Answer Source

There is no such factory method for Observable, but Completable could be made from Runnable. So you could create a Completable first and then convert it to Observable:

Observable.concat(
    someTask, 
    Completable.fromRunnable(() -> {
        System.out.println("Done");
    }).toObservable()
);

Update: Dealing with exceptions

Completable.fromRunnable internally catches exceptions from its Runnable and pushes them into the stream as onError emissions. However, if you are using Java, you have to deal with checked exceptions inside the run() method by yourself. To avoid that you could utilize Callable instead of Runnable, since its call() method's signature declares that it can throw exceptions. Completable.fromCallable() wraps exceptions into onError emissions as well:

Observable.concat(
    someTask, 
    Completable.fromCallable(() -> {
        System.out.println("Done");
        return null;
    }).toObservable()
);

Also Callable could be used to create an Observable or Single with a single item emission.

P.S. Check out the source code, these methods are pretty straightforward.

P.P.S. Kotlin has no checked exceptions ;)


Update 2

There is also fromAction factory method for creating Completable. It accepts Action objects.

A functional interface similar to Runnable but allows throwing a checked exception.

So the code could be simplified to:

Observable.concat(
    someTask, 
    Completable.fromAction(() -> {
        System.out.println("Done");
    }).toObservable()
);
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download