rustot rustot - 1 month ago 14
Java Question

How correctly reduce stream to another stream

I've stream of strings and nulls like

Stream<String> str1 = Stream.of("A","B","C",null,null,"D",null,"E","F",null,"G",null);


I want to reduce it to another stream, where any sequence of not null string joined together, ie like

Stream<String> str2 = Stream.of("ABC", "", "D", "EF","G")


First way, that i found - create collector that firstly reduce complete input stream to single object with list of all joined strings and then create new stream from it:

class Acc1 {
final private List<String> data = new ArrayList<>();
final private StringBuilder sb = new StringBuilder();

private void accept(final String s) {
if (s != null)
sb.append(s);
else {
data.add(sb.toString());
sb.setLength(0);
}
}

public static Collector<String,Acc1,Stream<String>> collector() {
return Collector.of(Acc1::new, Acc1::accept, (a,b)-> a, acc -> acc.data.stream());
}
}
...
Stream<String> str2 = str.collect(Acc1.collector());


But in this case before any use if str2, even as str2.findFirst(), input stream will be completely processed. It time and memory consuming operation and on infinity stream from some generator it will not work at all

Another way - create external object that will keep intermediate state and use it in flatMap():

class Acc2 {
final private StringBuilder sb = new StringBuilder();

Stream<String> accept(final String s) {
if (s != null) {
sb.append(s);
return Stream.empty();
} else {
final String result = sb.toString();
sb.setLength(0);
return Stream.of(result);
}
}
}
...
Acc2 acc = new Acc2();
Stream<String> str2 = str1.flatMap(acc::accept);


In this case from str1 will be retrieved only elemets that really accessed via str2.

But using of external object, created outside of stream processing, looks ugly for me and probably can cause some side effects, that i do not see now. Also if str2 will be used later with parallelStream() it will cause unpredictable result.

Is there any more correct implemetation of stream->stream reduction without these flaws?

Answer

Reduction or its mutable variant, collect, is always an operation that will process all items. Your operation can be implemented via a custom Spliterator, e.g.

public static Stream<String> joinGroups(Stream<String> s) {
    Spliterator<String> sp=s.spliterator();
    return StreamSupport.stream(
        new Spliterators.AbstractSpliterator<String>(sp.estimateSize(), 
        sp.characteristics()&Spliterator.ORDERED | Spliterator.NONNULL) {
            private StringBuilder sb = new StringBuilder();
            private String last;

            public boolean tryAdvance(Consumer<? super String> action) {
                if(!sp.tryAdvance(str -> last=str))
                    return false;
                while(last!=null) {
                    sb.append(last);
                    if(!sp.tryAdvance(str -> last=str)) break;
                }
                action.accept(sb.toString());
                sb=new StringBuilder();
                return true;
            }
        }, false);
}

which produces the intended groups, as you can test with

joinGroups(Stream.of("A","B","C",null,null,"D",null,"E","F",null,"G",null))
    .forEach(System.out::println);

but also has the desired lazy behavior, testable via

joinGroups(
    Stream.of("A","B","C",null,null,"D",null,"E","F",null,"G",null)
          .peek(str -> System.out.println("consumed "+str))
).skip(1).filter(s->!s.isEmpty()).findFirst().ifPresent(System.out::println);

After a second thought, I came to this slightly more efficient variant. It will incorporate the StringBuilder only if there are at least two Strings to join, otherwise, it will simply use the already existing sole String instance or the literal "" string for empty groups:

public static Stream<String> joinGroups(Stream<String> s) {
    Spliterator<String> sp=s.spliterator();
    return StreamSupport.stream(
        new Spliterators.AbstractSpliterator<String>(sp.estimateSize(), 
        sp.characteristics()&Spliterator.ORDERED | Spliterator.NONNULL) {
            private String next;

            public boolean tryAdvance(Consumer<? super String> action) {
                if(!sp.tryAdvance(str -> next=str))
                    return false;
                String string=next;
                if(string==null) string="";
                else if(sp.tryAdvance(str -> next=str) && next!=null) {
                    StringBuilder sb=new StringBuilder().append(string);
                    do sb.append(next);while(sp.tryAdvance(str -> next=str) && next!=null);
                    string=sb.toString();
                }
                action.accept(string);
                return true;
            }
        }, false);
}
Comments