AnnonAshera AnnonAshera - 3 years ago 92
Java Question

RxJava Live Reactive Queue (with on off switch)

Working on an android app in kotlin.

I need to set up a system to be able to do work from a live queue (and observe the results of the work in a stream).

But I also have to be able to toggle the "queue processing" depending on a couple external factors (which also come in as streams) like

networkIsAvailable (Observable<Boolean>)
.

I can't use
Observable.fromIterable()
because that creates the iterable right away and this queue will adjust and items may get removed.

I need some kind of loop where I can finish the item, check to make sure we should keep going, and then pop the first item of the queue and do that.

I'm not sure how to do some kind of loop like this in a subscription?

The queue can also become empty and things should start again when the toggle is turned back on.

Perhaps I should push out that determination (about whether to process the next item in the queue or not) into a
Subject<Boolean>?
and then have a subscription to that subject that starts the process again?

examples:

turn on ---- process top of queue, process top of queue (previous was polled off the queue) --- turn off -- no more processing

turn back on -- process top of queue, queue-empty -- stops

add item to queue -- processes -- stops queue empty

turn on processing -- add item to queue-- doesnt process until its turned back on

turn on -- processing top item

Answer Source

You can use valve() operator from RxJava2Extensions for this.

public PublishProcessor<Boolean> flowControl = PublishProcessor.create();

public void start() {
    Flowable./*...*/
            .compose(FlowableTransformers.valve(flowControl))
            .subscribe(/*...*/);

In which case flowControl.onNext(true) will start the stream, and flowControl.onNext(false) will stop and queue it.

(Any other Rx2 type must be converted to Flowable to use it.)

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download