RemcoW RemcoW - 22 days ago 8
Scala Question

Akka Stream Graph parallelisation

I've created a

Graph
whcih contains a
Balance
. This
Balance
distributes the load over 5
Flows
. What I expected what would happen was that every instance of my
Flow
would run on a seperate
Thread
. However, this is not what happens.
When I'm printing the
Thread
name I notice that all
Flows
are being executed on the same
Thread
.

The code I'm using is:

RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
val in = Source(1 to 10)
val out = Sink.ignore

val bal = builder.add(Balance[Int](5))
val merge = builder.add(Merge[Int](5))

val f1, f2, f3, f4, f5 = Flow[Int].map(x => {
println(Thread.currentThread())
x
}).async

in ~> bal ~> f1 ~> merge ~> out
bal ~> f2 ~> merge
bal ~> f3 ~> merge
bal ~> f4 ~> merge
bal ~> f5 ~> merge

ClosedShape
})


This outputs:


Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]


My expectation was that the output would be something along the lines of:


Thread[Stream_PoC-akka.actor.default-dispatcher-1,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-2,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-3,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-4,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-1,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-2,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-3,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-4,5,main]

Thread[Stream_PoC-akka.actor.default-dispatcher-5,5,main]


How can I change this code sample so that the
Flows
are being executed in parallel?

Answer

The async directive does not guarantee your stages will be executed in a separated thread. As long as the stages do not overlap in time, they might run on the same thread.

For your specific case, the executed steps might be the following:

  • merge requests an element on the 1st inlet
  • balance serves an element through the 1st flow
  • merge requests an element on the 2nd inlet
  • balance serves an element through the 2nd flow
  • etc.

Now if you change your Balance as follows

val bal = builder.add(Balance[Int](5, waitForAllDownstreams = true))

You will be forcing 5 threads to be spawned, as the steps would be

  • merge requests an element on 1st inlet
  • merge requests an element on 2nd inlet
  • merge requests an element on 3rd inlet
  • merge requests an element on 4th inlet
  • merge requests an element on 5th inlet
  • balance starts serving elements through all flows