Scott Cooper Scott Cooper - 16 days ago 16
Android Question

Pause and resume an observable based on a boolean gate in RxJava 2.X?

Let's say I have a Processor that emits a boolean value when ever a button is pressed, think of this as a toggle.

boolean gateValue = true;
PublishProcessor<Boolean> gate = PublishProcessor.create();
view.onButtonClicked()
.subscribe(new Action1<Void>() {
@Override
public void call(final Void aVoid) {
gate.onNext(gateValue = !gateValue);
}
}));


What I would like to do is use the value of the gate to pause and resume an observable sequence, buffering the emitted values whilst paused.

I've read into this a lot and while it seems possible in reactive extensions for other languages, RxJava doesn't seem to support it.

Here's an example of what I'd like to achieve, it simply outputs an incremental value every second. When I press the button I want the output to stop until I press it again which should output every item emitted between the two button presses:

Flowable.interval(1, TimeUnit.SECONDS)
.bufferWhile(gate)
.flatMapIterable(longs -> longs)
.subscribe(new Consumer<Long>() {
@Override
public void accept(final Long aLong) throws Exception {
view.displayTime(aLong);
}
});


Does anyone know of a way to achieve something like this?

Answer

There is now an operator valve() in the RxJava2Extensions library that does the requested behavior.

Comments