babernathy babernathy - 6 months ago 25
Java Question

Properly handling empty Observable in RxJava

I have a situation where I am creating an Observable containing results from a database. I am then applying a series of filters to them. I then have a subscriber that is logging the results. It may be the case that no elements make their way though the filters. My business logic states this is not an error. However, when this happens my onError is called and contains the following exception:

java.util.NoSuchElementException: Sequence contains no elements


Is the accepted practice to just detect that type of exception and ignore it? Or is there a better way to handle this?

The version is 1.0.0.

Here is a simple test case that exposes what I'm seeing. It appears to be related to having all the events filtered before reaching a map and reduce.

@Test
public void test()
{

Integer values[] = new Integer[]{1, 2, 3, 4, 5};

Observable.from(values).filter(new Func1<Integer, Boolean>()
{
@Override
public Boolean call(Integer integer)
{
if (integer < 0)
return true;
else
return false;
}
}).map(new Func1<Integer, String>()
{
@Override
public String call(Integer integer)
{
return String.valueOf(integer);
}
}).reduce(new Func2<String, String, String>()
{
@Override
public String call(String s, String s2)
{
return s + "," + s2;
}
})

.subscribe(new Action1<String>()
{
@Override
public void call(String s)
{
System.out.println(s);
}
});
}


Because I am using a safe subscriber, it initially throws an OnErrorNotImplementedException which wraps the following exception:

java.util.NoSuchElementException: Sequence contains no elements
at rx.internal.operators.OperatorSingle$1.onCompleted(OperatorSingle.java:82)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140)
at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:73)
at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45)
at rx.internal.operators.OperatorTakeLast$1.onCompleted(OperatorTakeLast.java:59)
at rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:121)
at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43)
at rx.internal.operators.OperatorFilter$1.onCompleted(OperatorFilter.java:42)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:79)
at rx.internal.operators.OperatorScan$2$1.request(OperatorScan.java:147)
at rx.Subscriber.setProducer(Subscriber.java:139)
at rx.internal.operators.OperatorScan$2.setProducer(OperatorScan.java:139)
at rx.Subscriber.setProducer(Subscriber.java:133)
at rx.Subscriber.setProducer(Subscriber.java:133)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:47)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable.subscribe(Observable.java:7284)


Based on the answer from @davem below, I created a new test case:

@Test
public void testFromBlockingAndSingle()
{

Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};

List<String> results = Observable.from(values).filter(new Func1<Integer, Boolean>()
{
@Override
public Boolean call(Integer integer)
{
if (integer < 0)
return true;
else
return false;
}
}).map(new Func1<Integer, String>()
{
@Override
public String call(Integer integer)
{
return String.valueOf(integer);
}
}).reduce(new Func2<String, String, String>()
{
@Override
public String call(String s, String s2)
{
return s + "," + s2;
}
}).toList().toBlocking().single();

System.out.println("Test: " + results + " Size: " + results.size());

}


And this test results in the following behavior:

When the input is:

Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};


Then the results (as expected) are:

Test: [-2,-1] Size: 1


And when the input is:

Integer values[] = new Integer[]{0, 1, 2, 3, 4, 5};


Then the result is the following stack trace:

java.util.NoSuchElementException: Sequence contains no elements
at rx.internal.operators.OperatorSingle$1.onCompleted(OperatorSingle.java:82)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140)
at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:73)
at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45)
at rx.internal.operators.OperatorTakeLast$1.onCompleted(OperatorTakeLast.java:59)
at rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:121)
at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43)
at rx.internal.operators.OperatorFilter$1.onCompleted(OperatorFilter.java:42)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:79)
at rx.internal.operators.OperatorScan$2$1.request(OperatorScan.java:147)
at rx.Subscriber.setProducer(Subscriber.java:139)
at rx.internal.operators.OperatorScan$2.setProducer(OperatorScan.java:139)
at rx.Subscriber.setProducer(Subscriber.java:133)
at rx.Subscriber.setProducer(Subscriber.java:133)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:47)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable.subscribe(Observable.java:7284)
at rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:441)
at rx.observables.BlockingObservable.single(BlockingObservable.java:340)
at EmptyTest2.test(EmptyTest2.java:19)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)


So it appears that the problem is definitely with the use of the reduce function. See the following test case that handles both situations:

@Test
public void testNoReduce()
{

Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};

List<String> results = Observable.from(values).filter(new Func1<Integer, Boolean>()
{
@Override
public Boolean call(Integer integer)
{
if (integer < 0)
return true;
else
return false;
}
}).map(new Func1<Integer, String>()
{
@Override
public String call(Integer integer)
{
return String.valueOf(integer);
}
}).toList().toBlocking().first();

Iterator<String> itr = results.iterator();
StringBuilder b = new StringBuilder();

while (itr.hasNext())
{
b.append(itr.next());

if (itr.hasNext())
b.append(",");
}

System.out.println("Test NoReduce: " + b);

}


With the following input:

Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};


I get the following results which are expected:

Test NoReduce: -2,-1


And with the following input:

Integer values[] = new Integer[]{0, 1, 2, 3, 4, 5};


I get the following output which are expected:

Test NoReduce:


So, unless I am completely misunderstanding something, the only way to really handle a zero element Observable that results from a filter when followed by a map and reduce is to implement the reduce logic outside of the Observable chain. Do you all agree with that statement?




Final Solution

Here is my final solution after implementing what both Tomáš Dvořák and David Motten suggested. I think this solution is reasonable.

@Test
public void testWithToList()
{

Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};

Observable.from(values).filter(new Func1<Integer, Boolean>()
{
@Override
public Boolean call(Integer integer)
{
if (integer < 0)
return true;
else
return false;
}
}).toList().map(new Func1<List<Integer>, String>()
{
@Override
public String call(List<Integer> integers)
{
Iterator<Integer> intItr = integers.iterator();
StringBuilder b = new StringBuilder();

while (intItr.hasNext())
{
b.append(intItr.next());

if (intItr.hasNext())
{
b.append(",");
}
}

return b.toString();
}
}).subscribe(new Action1<String>()
{
@Override
public void call(String s)
{
System.out.println("With a toList: " + s);
}
});

}


Here is how this test behaves when given the following inputs.

When given a stream that will have some values pass through the filters:

Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};


The result is:

With a toList: -2,-1


When given a stream that will not have any values pass through the filters:

Integer values[] = new Integer[]{0, 1, 2, 3, 4, 5};


The result is:

With a toList: <empty string>

Answer

Now after your update the error is quite obvious. Reduce in RxJava will fail with an IllegalArgumentException if the observable it is reducing is empty, exactly as per specification (http://reactivex.io/documentation/operators/reduce.html).

In functional programming, there are usually two generic operators that aggregate a collection into a single value, fold and reduce. In the accepted terminology, fold takes an initial accumulator value, and a function that takes a running accumulator and a value from the collection and produces another accumulator value. An example in pseudocode:

[1, 2, 3, 4].fold(0, (accumulator, value) => accumulator + value)

will start with 0, and eventually add 1, 2, 3, 4 to the running accumulator, finally yielding 10, the sum of the values.

Reduce is very similar, only it doesn't take the initial accumulator value explicitly, it uses the first value as an initial accumulator, and then accumulates all the remaining values. This makes sense if you are e.g. looking for a minimum or maximum value.

[1, 2, 3, 4].reduce((accumulator, value) => min(accumulator, value))

Looking at fold and reduce different way, you will probably use foldwhenever the aggregated value will make sense even on empty collection (like, in sum, 0 makes sense), and reduce otherwise (minimum makes no sense on an empty collection, and reduce will fail to operate on such collection, in your case by throwing an exception).

You are doing a similar aggregation, interspersing a collection of strings with a comma to produce a single string. That is a little bit more difficult situation. It probably makes sense on an empty collection (you probably expect an empty string), but on the other hand, if you start with an empty accumulator, you will have one more comma in the result than you expect. Correct solution to this is to check, whether the collection is empty first, and either return a fall back string for an empty collection, or do a reduce on a non-empty collection. You will probably observe, that often you don't actually want an empty string in the empty collection case, but something like "collection is empty" might be more appropriate, thus further assuring you that this solution is the clean one.

Btw, I'm using the word collection here instead of observable freely, just for educational purposes. Also, in RxJava, both fold and reduce are called the same, reduce, only there are two versions of that method, one taking just one parameter, the other two parameters.

As for your final question: you don't have to leave the Observable chain. Just use toList(), as David Motten suggests.

.filter(...)
.toList()
.map(listOfValues => listOfValues.intersperse(", "))

where intersperse could be implemented in terms of reduce, if not already a library function (it is quite common).

collection.intersperse(separator) = 
    if (collection.isEmpty()) 
      ""
    else
      collection.reduce(accumulator, element => accumulator + separator + element)