Scott Cooper Scott Cooper - 7 months ago 92
Android Question

Pause and resume an observable based on a boolean gate in RxJava 2.X?

Let's say I have a Processor that emits a boolean value when ever a button is pressed, think of this as a toggle.

boolean gateValue = true;
PublishProcessor<Boolean> gate = PublishProcessor.create();
.subscribe(new Action1<Void>() {
public void call(final Void aVoid) {
gate.onNext(gateValue = !gateValue);

What I would like to do is use the value of the gate to pause and resume an observable sequence, buffering the emitted values whilst paused.

I've read into this a lot and while it seems possible in reactive extensions for other languages, RxJava doesn't seem to support it.

Here's an example of what I'd like to achieve, it simply outputs an incremental value every second. When I press the button I want the output to stop until I press it again which should output every item emitted between the two button presses:

Flowable.interval(1, TimeUnit.SECONDS)
.flatMapIterable(longs -> longs)
.subscribe(new Consumer<Long>() {
public void accept(final Long aLong) throws Exception {

Does anyone know of a way to achieve something like this?


There is now an operator valve() in the RxJava2Extensions library that does the requested behavior.