Mateus Zitelli Mateus Zitelli - 1 year ago 100
Java Question

TestScheduler not working on RxJava

I'm trying to test a function where elements of a stream are dispatched one by one after a delay, I was able to get my tests working using

. However, when I use the
I can't get any result.

Check out the code:

public Observable<Object> getDelayedObjects(Observable<Observable<Object>> objectsStreams) {
objectsStreams.concatMap(objectsStream ->
objectsStream.repeat().concatMap(object ->
.delay(getDuration(object), TimeUnit.MILLISECONDS)));

And the test code:

TestScheduler testScheduler = new TestScheduler();
BehaviorSubject<Observable<Object>> objectStreamSubject = BehaviorSubject.create(objectsStream);


//Thread.sleep(900) works with the default scheduler
testScheduler.advanceTimeBy(900, TimeUnit.MILLISECONDS);


Checking around for
usage I found out that is usual to pass the scheduler to the
function. So I was able to get the tests to pass by providing the scheduler as an argument to the method
and then to
. However, I still did not get why it was not working before.

Answer Source

The delay operator, by default, uses the computation scheduler for performing time based delays. This information can be found in the documentation of the method. Look for the value in @SchedulerSupport annotation, which in this case is io.reactivex:computation.

For testing purposes you will have to replace the computation Scheduler with a TestScheduler. To be able to replace, you will have to use one of the many overrides of delay operator that takes in a Scheduler.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download