mrobinson7627 mrobinson7627 - 2 months ago 13
Android Question

RxAndroid Observable running on unexpected thread

I'm trying to create an

Observable
such that it will load some data from the network on an interval, and whenever the user refreshes the page. This is the gist of what I have so far:

PublishSubject<Long> refreshSubject = PublishSubject.create();
Observable<MyDataType> observable = Observable.merge(
Observable.interval(0, 3, TimeUnit.SECONDS),
refreshSubject
)
.flatMap(t -> {
// network operations that eventually return a value
// these operations are not observables themselves
// they are fully blocking network operations
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
// update ui with data
}, error -> {
// do something with error
});


Later in a refresh callback I have:

refreshSubject.onNext(0L);


It runs on the interval fine, however when I refresh, it explodes with a
NetworkOnMainThreadException
. I thought that I handled this with
subscribeOn
/
observeOn
. What am I missing? Also, why doesn't this cause a crash when the
Observer
is triggered from the interval?

Answer

You have to change your subscribeOn(Schedulers.io()) to observeOn(Schedulers.io()) and move it over your flatMap. The reason for this is that your refreshSubject is a PublishSubject, which is an Observable and an Observer.

Since the onNext() of this PublishSubject is called inside the intern Observable first before the result gets delivered to your subscription. This is also the reason that it works when you just use your Observable(and the fact that interval always subscribes to the computation thread by default).

Just check the output of those two snippets:

Observable.merge(
    Observable.interval(0, 3, TimeUnit.SECONDS),
    refreshSubject
)
.observeOn(Schedulers.io())
.doOnNext(aLong -> Log.d("Thread", Thread.currentThread().toString()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
    Log.d("Subscribe Thread", Thread.currentThread().toString());
}, error -> {
                // do something with error
            });

vs

Observable.merge(
    Observable.interval(0, 3, TimeUnit.SECONDS),
    refreshSubject
)
.doOnNext(aLong -> Log.d("Thread", Thread.currentThread().toString()))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
    Log.d("Subscribe Thread", Thread.currentThread().toString());
}, error -> {
    // do something with error
});
Comments