borune borune - 1 month ago 18
Android Question

rx subscribeOn didn't work with Subject?

here is the code:

Observable<Integer> observable = Observable.just(1, 2);
observable.doOnNext(i -> System.out.println("Emitting " + i + " on thread " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.computation())
.doOnNext(i -> System.out.println("Emitting " + i + " after subscribeOn" + " on thread " + Thread.currentThread().getName()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(i -> System.out.println( "Receiving " + i + " on thread " + Thread.currentThread().getName()));


it gives me this output:

I/System.out: Emitting 1 on thread RxComputationScheduler-1
I/System.out: Emitting 1 after subscribeOn on thread RxComputationScheduler-1
I/System.out: Emitting 2 on thread RxComputationScheduler-1
I/System.out: Emitting 2 after subscribeOn on thread RxComputationScheduler-1
I/System.out: Receiving 1 on thread main
I/System.out: Receiving 2 on thread main


all is okay.

Now i wanna to do same thing with Subject, i've changed the code as follows:

PublishSubject<Integer> subject = PublishSubject.create();

subject.doOnNext(i -> System.out.println("Emitting " + i + " on thread " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.computation())
.doOnNext(i -> System.out.println("Emitting " + i + " after subscribeOn" + " on thread " + Thread.currentThread().getName()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(i -> System.out.println( "Receiving " + i + " on thread " + Thread.currentThread().getName()));

subject.onNext(1);


and in this case i get no output. Why so? And how i can force Subject emit flow in given Scheduler?

Answer

decision is:

PublishSubject<Integer> subject = PublishSubject.create();
subject
      **.observeOn(Schedulers.computation())**
      .doOnNext(i -> System.out.println("Emitting " + i + " on thread " + Thread.currentThread().getName()))                    
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(i -> System.out.println( "Receiving " + i + " on thread " + Thread.currentThread().getName()));

subject.onNext(1);

or u can call subject.onNext(1) from Schedulers.computation() thread