James Davies James Davies - 3 months ago 11
Scala Question

Splitting a scalaz-stream process into two child streams

Using scalaz-stream is it possible to split/fork and then rejoin a stream?

As an example, let's say I have the following function

val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10)

val sumOfEvenNumbers = streamOfNumbers.filter(isEven).fold(0)(add)
val sumOfOddNumbers = streamOfNumbers.filter(isOdd).fold(0)(add)

zip( sumOfEven, sumOfOdd ).to( someEffectfulFunction )


With scalaz-stream, in this example the results would be as you expect - a tuple of numbers from 1 to 10 passed to a sink.

However if we replace
streamOfNumbers
with something that requires IO, it will actually execute the IO action twice.

Using a
Topic
I'm able create a pub/sub process that duplicates elements in the stream correctly, however it does not buffer - it simply consumers the entire source as fast as possible regardless of the pace sinks consume it.

I can wrap this in a bounded Queue, however the end result feels a lot more complex than it needs to be.

Is there a simpler way of splitting a stream in scalaz-stream without duplicate IO actions from the source?

Answer

Also to clarify the previous answer delas with the "splitting" requirement. The solution to your specific issue may be without the need of splitting streams:

val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10)
val oddOrEven: Process[Task,Int\/Int] = streamOfNumbers.map {
   case even if even % 2 == 0 => right(even)
   case odd => left(odd)
} 
val summed = oddOrEven.pipeW(sump1).pipeO(sump1)

val evenSink: Sink[Task,Int] = ???
val oddSink: Sink[Task,Int] = ???

summed
.drainW(evenSink)
.to(oddSink)