0__ 0__ - 20 days ago 12
Scala Question

Akka Stream : dynamically grow outlets during graph builder phase

I understand in Akka Stream one outlet must be connected to one inlet and there is no automatic support for connecting multiple sinks to the same source. So you have to insert intermediate objects such as

Broadcast
.

I am transforming a signal processing DAG into a Akka Stream graph, and it would help me a lot if I could dynamically add sinks to sources as they are discovered on the traversal. If I have a custom
GraphStage
, can I have my own
Shape
whose
outlets
collection grows dynamically during the
Graph.create
phase? The normal DSL operation
~>
is backed by this call:

b.addEdge(importAndGetPort(b), to)


How does the builder "get" the
Outlet
here and would I be able to grow my shape on demand?




If this does not work, is it possible to "eject" a previous broadcast, disconnect its edges and wire them up with a new larger broadcast during the graph building?

Answer

The GraphDSL does not allow to mutate your shape dynamically.

However, since Akka 2.4.10 you can use BroadcastHub (and MergeHub).

BroadcastHub can give you a Sink that materializes into a Source. This Source can in turn be materialized as many times as needed to dynamically attach multiple subscribers.

So for a node of your DAG (e.g. having indegree=1 and outdegree=3), you could have something like

val hubSource = inEdgeSource.toMat(BroadcastHub.sink(bufferSize = ...))(Keep.right).run()

val nodeSink1 = hubSource.to(outEdgeSink1).run()
val nodeSink2 = hubSource.to(outEdgeSink2).run()
val nodeSink3 = hubSource.to(outEdgeSink3).run()

Akka docs:

http://doc.akka.io/docs/akka/2.4/scala/stream/stream-dynamic.html#Dynamic_fan-in_and_fan-out_with_MergeHub_and_BroadcastHub

Comments