Fetiorin Fetiorin - 1 year ago 96
Scala Question

Akka-streams mapConcat not working with cycled RunnableGraph

I have

RunnableGraph
like following. When there is simple
map
between
broadcast
and
merge
stages everything is fine. However, when it comes to
mapConcat
, this code is not working after consuming the first element.

I want to know why it doesn't work.

RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._

val M = b.add(MergePreferred[Int](1))
val B = b.add(Broadcast[Int](2))
val S = Source(List(3))

S ~> M ~> Flow[Int].map { s => println(s); s } ~> B ~> Sink.ignore
M.preferred <~ Flow[Int].map(x => List.fill(3)(x-1)).mapConcat(x => {println(x); x}).filter(_ > 0) <~ B
ClosedShape
})

// run() output:
// 3
// List(2,2,2)

Answer Source

The mapConcat stage blocks the feedback loop, and that is expected. Consider the following chain of events:

  1. the mapConcat function prints List(2,2,2)
  2. the mapConcat stage needs demand to emit the first of the 3 available elements (2, 2, 2)
  3. the demand has to come from the Merge stage, and therefore from the Broadcast stage.
  4. the Broadcast stage backpressures if any of its downstreams backpressures. It's downstreams are a Sink.ignore (that never backpressures), and the mapConcat itself.
  5. the mapConcat backpressures if "there are still remaining elements from the previously calculated collection", as per the docs. This is indeed the case.

In other words, your cycle is unbalanced. You are introducing more elements in the feedback loop than you are removing.

This issue is explained in detail in this documentation page, where a couple of solutions are also presented. For your specific case, because of the filter stage you have, introducing a buffer larger than 13 would print all the elements. However, note that the graph will just hang and not complete afterwards.

S ~> M ~> Flow[Int].map { s => println(s); s } ~> B ~> Sink.ignore
M.preferred <~ Flow[Int].buffer(20, OverflowStrategy.dropHead) <~ Flow[Int].map(x => List.fill(3)(x-1)).mapConcat(x => {println(x); x}).filter(_ > 0)  <~ B
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download