plasma147 plasma147 - 23 days ago 16
Java Question

Creating a Flowable that emits items at a limited rate to avoid the need to buffer events

I've got a data access object that passes each item in a data source to a consumer:

public interface Dao<T> {
void forEachItem(Consumer<T> item);
}


This always produces items in a single threaded way - I can't currently change this interface.

I wanted to create a
Flowable
from this interface:

private static Flowable<String> flowable(final Dao dao) {
return Flowable.create(emitter -> {
dao.forEachItem(item ->
emitter.onNext(item));
emitter.onComplete();
}, ERROR);
}


If I use this
Flowable
in a situation where the processing takes longer than the rate at which items are emitted then I understandably get a missing back pressure exception as I am using
ERROR
mode:

Dao<String> exampleDao =
itemConsumer ->
IntStream.range(0, 1_000).forEach(i ->
itemConsumer.accept(String.valueOf(i)));

flowable(exampleDao)
.map(v -> {
Thread.sleep(100);
return "id:" + v;
})
.blockingSubscribe(System.out::println);


I don't wish to buffer items - seems like this could lead to exhausting memory on very large data sets - if the operation is significantly slower than the producer.

I was hoping there would be a backpressure mode that would allow the emitter to block when passed next/completion events when it detects back pressure but that does not seem to be the case?

In my case as I know that the dao produces items in a single threaded way I thought I would be able to do something like:

dao.forEachItem(item -> {
while (emitter.requested() == 0) {
waitABit();
}
emitter.onNext(item)
});


but this seems to hang forever.

How wrong is my approach? :-) Is there a way of producing items in a way that respects downstream back pressure given my (relatively restrictive) set of circumstances?

I know I could do this with a separate process writing to a queue and then write a Flowable based on consuming from that queue- would that be the preferred approach instead?

Answer Source

Check the part of the Flowable, especially the part with Supscription.request(long). I hope that gets you on the right way.


The TestProducerfrom this example produces Integerobjects in a given range and pushes them to its Subscriber. It extends the Flowable<Integer> class. For a new subscriber, it creates a Subscription object whose request(long) method is used to create and publish the Integer values.

It is important for the Subscription that is passed to the subscriber that the request() method which calls onNext()on the subscriber can be recursively called from within this onNext() call. To prevent a stack overflow, the shown implementation uses the outStandingRequests counter and the isProducing flag.

class TestProducer extends Flowable<Integer> {
    static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
    final int from, to;

    public TestProducer(int from, int to) {
        this.from = from;
        this.to = to;
    }

    @Override
    protected void subscribeActual(Subscriber<? super Integer> subscriber) {
        subscriber.onSubscribe(new Subscription() {

            /** the next value. */
            public int next = from;
            /** cancellation flag. */
            private volatile boolean cancelled = false;
            private volatile boolean isProducing = false;
            private AtomicLong outStandingRequests = new AtomicLong(0);

            @Override
            public void request(long n) {
                if (!cancelled) {

                    outStandingRequests.addAndGet(n);

                    // check if already fulfilling request to prevent call  between request() an subscriber .onNext()
                    if (isProducing) {
                        return;
                    }

                    // start producing
                    isProducing = true;

                    while (outStandingRequests.get() > 0) {
                        if (next > to) {
                            logger.info("producer finished");
                            subscriber.onComplete();
                            break;
                        }
                        subscriber.onNext(next++);
                        outStandingRequests.decrementAndGet();
                    }
                    isProducing = false;
                }
            }

            @Override
            public void cancel() {
                cancelled = true;
            }
        });
    }
}

The Consumer in this example extends DefaultSubscriber<Integer> and on start and after consuming an Integer requests the next one. On consuming the Integer values, there is a little delay, so the backpressure will be built up for the producer.

class TestConsumer extends DefaultSubscriber<Integer> {

    private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class);

    @Override
    protected void onStart() {
        request(1);
    }

    @Override
    public void onNext(Integer i) {
        logger.info("consuming {}", i);
        if (0 == (i % 5)) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException ignored) {
                // can be ignored, just used for pausing
            }
        }
        request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        logger.error("error received", throwable);
    }

    @Override
    public void onComplete() {
        logger.info("consumer finished");
    }
}

in the following main method of a test class the producer and consumer are created and wired up:

public static void main(String[] args) {
    try {
        final TestProducer testProducer = new TestProducer(1, 1_000);
        final TestConsumer testConsumer = new TestConsumer();

        testProducer
                .subscribeOn(Schedulers.computation())
                .observeOn(Schedulers.single())
                .blockingSubscribe(testConsumer);

    } catch (Throwable t) {
        t.printStackTrace();
    }
}

When running the example, the logfile shows that the consumer runs continuously, while the producer only gets active when the internal Flowable buffer of rxjava2 needs to be refilled.