Mateus Zitelli Mateus Zitelli - 26 days ago 12
Java Question

TestScheduler not working on RxJava

I'm trying to test a function where elements of a stream are dispatched one by one after a delay, I was able to get my tests working using

Thread.sleep
. However, when I use the
TestScheduler.advanceTimeBy
I can't get any result.

Check out the code:

public Observable<Object> getDelayedObjects(Observable<Observable<Object>> objectsStreams) {
objectsStreams.concatMap(objectsStream ->
objectsStream.repeat().concatMap(object ->
Observable.just(object)
.delay(getDuration(object), TimeUnit.MILLISECONDS)));
}


And the test code:

TestScheduler testScheduler = new TestScheduler();
BehaviorSubject<Observable<Object>> objectStreamSubject = BehaviorSubject.create(objectsStream);

model.getDelayedObjects(objectStreamSubject)
.observeOn(testScheduler)
.subscribeOn(testScheduler)
.subscribe(testSubscriber);

testScheduler.triggerActions();
//Thread.sleep(900) works with the default scheduler
testScheduler.advanceTimeBy(900, TimeUnit.MILLISECONDS);
testSubscriber.assertReceivedOnNext(objects);


Update:



Checking around for
TestScheduler
usage I found out that is usual to pass the scheduler to the
delay
function. So I was able to get the tests to pass by providing the scheduler as an argument to the method
getDelayedObjects
and then to
delay
. However, I still did not get why it was not working before.

Answer

The delay operator, by default, uses the computation scheduler for performing time based delays. This information can be found in the documentation of the method. Look for the value in @SchedulerSupport annotation, which in this case is io.reactivex:computation.

For testing purposes you will have to replace the computation Scheduler with a TestScheduler. To be able to replace, you will have to use one of the many overrides of delay operator that takes in a Scheduler.

Comments