ndori ndori - 1 month ago 9
Android Question

RxJava onBackpressureBuffer not emitting items

I've witnessed a weird behavior with onBackpressureBuffer, I'm not sure if it is a valid behavior or a bug.

I'm having a tcp call that is emitting items in a certain rate (using streaming and inputStream but that just for some info)

On top of it I've created an observable using create that will emit an item each time it is ready.

Let's call it messages().

Then I'm doing this:

messages()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({//do some work});


I've noticed using analytics tools that MissingBackPressureException is thrown rarely, so I've added onBackpressureBuffer to the call.

If I'm adding it after
observeOn
:

messages()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.onBackpressureBuffer()
.subscribe({//do some work})


everyting works fine, but it means it will buffer only after it get's to the UI Main thread, so I prefered it to be like this:

messages()
.onBackpressureBuffer()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({//do some work});


And that where things start to get weird.

I've noticed the while
messages()
keeps emitting item, at some point they will stop being delivered to the subscriber.

More precisely after exactly 16 items, what is apparently happing is that the buffer will start holding items without passing them forward.

Once I cancel the
messages()
with some sort of timeout mechanism, it will cause
messages()
to emit
onError()
and the buffer will emit immediately all the items it has kept (they will be handled).

I've checked to see if it is the subscriber fault of doing too much work but it not, he is finished and still he doesn't get the items...

I've also tried using the
request(n)
method in the subscriber asking for one item after
onNext()
is finished but the buffer doesn't behave.

I suspect that the messaging system of the Main Android UI Thread with the backpressure causing this, but I can't explain why.

can someone explain why this is happening? is this a bug or a valid behaviour?
Tnx!

Answer

Not knowing how messages(), based on the behavior described, this is a similar same-pool deadlock as with this question

The workaround, what you didn't try, is to put the .onBackpressureBuffer between the subscribeOn and observeOn.

messages()
.subscribeOn(Schedulers.io())
.onBackpressureBuffer()           // <---------------------
.observeOn(AndroidSchedulers.mainThread())
.subscribe({//do some work});
Comments