bakua bakua - 25 days ago 24
Android Question

RxJava Subject emits on incorrect scheduler

I have a following class which I hold as a singleton:

public class SessionStore {
Subject<Session, Session> subject;

public UserSessionStore() {
subject = new SerializedSubject<>(BehaviorSubject.create(new Session());
}

public void set(Session session) {
subject.onNext(session);
}

public Observable<UserSession> observe() {
return subject.distinctUntilChanged();
}
}


In activity I observe the session and perform network operation on each change:

private Subscription init() {
return sessionStore
.observe()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<Session, Observable<Object>>() {
@Override
public Observable<Object> call(Session session) {
return retrofitService.getAThing();
}
})
.subscribe(...);
}


When I subscribe to the session store, the subject emits on
io()
immediatelly as it is a
BehaviourSubject
and subscriber performs on
mainThread()
.

The issue comes when I call
sessionStore.set(new AnotherSession())
while already subscribed to it. IMO this should execute stream defined in
init()
on the
io()
scheduler. However what happens instead is that the stream executes on the same thread the
subject.onNext()
was called on. Resulting into
NetworkOnMainThreadException
as I am doing an network operation in
flatMap()
.

Do I understand subjects wrong? Do I misuse them this way? What is the proper solution then please?

I've also tried to replace the whole subject approach with
Observable.fromEmitter()
in
observe()
method, but suprisingly the output was the very same.

Answer

please have a look at the following part from the book 'Reactive Programming with RxJava'

By default calling onNext() on a Subject is directly propagated to all Observer's onNext() callback methods. It is not a surprise that these methods share the same name. In a way, calling onNext() on Subject indirectly invokes onNext() on each and every Subscriber.

Lets recap: If you call onNext on a Subject from Thread-1, it will invoke onNext to subscriber from Thread-1. onSubscribe will be discared.

So first things first: On which Thread will the subscription happen for:

retrofitService.getAThing()

I will just guess, and say it is the invoking thread. Which would be the thread described in observeOn, which is the Android-UI-Loop.

Every value under observeOn will be shifted from Thread-a to Thread-b as specified by the scheduler. The observeOn should on the UI-Loop should happen right before the subscription. Every value which will be received in the subscription would be on the UI-Loop, which will not block the UI thread or end in an exception.

Pease have a look at the example code and the output:

class SessionStore {
    private Subject<String, String> subject;

    public SessionStore() {
        subject = BehaviorSubject.create("wurst").toSerialized();
    }

    public void set(String session) {
        subject.onNext(session);
    }

    public Observable<String> observe() {
        return subject
                .asObservable()
                .doOnNext(s -> System.out.println("Receiving value on Thread:: " + Thread.currentThread()))
                .distinctUntilChanged();
    }
}

@Test
public void name() throws Exception {
    // init
    SessionStore sessionStore = new SessionStore();

    TestSubscriber testSubscriber = new TestSubscriber();
    Subscription subscribe = sessionStore
            .observe()
            .flatMap(s -> {
                return Observable.fromCallable(() -> {
                    System.out.println("flatMap Thread:: " + Thread.currentThread());
                    return s;
                }).subscribeOn(Schedulers.io());
            })
            .doOnNext(s -> System.out.println("After flatMap Thread:: " + Thread.currentThread()))
            .observeOn(Schedulers.newThread()) // imagine AndroidScheduler here
            .subscribe(testSubscriber); // Do UI-Stuff in subscribe

    new Thread(() -> {
        System.out.println("set on Thread:: " + Thread.currentThread());
        sessionStore.set("123");
    }).start();

    new Thread(() -> {
        System.out.println("set on Thread:: " + Thread.currentThread());
        sessionStore.set("345");
    }).start();

    boolean b = testSubscriber.awaitValueCount(3, 3_000, TimeUnit.MILLISECONDS);

    Assert.assertTrue(b);
}

Output::

Receiving value on Thread:: Thread[main,5,main]
flatMap Thread:: Thread[RxIoScheduler-2,5,main]
After flatMap Thread:: Thread[RxIoScheduler-2,5,main]
set on Thread:: Thread[Thread-1,5,main]
set on Thread:: Thread[Thread-0,5,main]
Receiving value on Thread:: Thread[Thread-1,5,main]
flatMap Thread:: Thread[RxIoScheduler-2,5,main]
After flatMap Thread:: Thread[RxIoScheduler-2,5,main]
Receiving value on Thread:: Thread[Thread-1,5,main]
flatMap Thread:: Thread[RxIoScheduler-2,5,main]
After flatMap Thread:: Thread[RxIoScheduler-2,5,main]
Comments