0__ 0__ - 2 months ago 23
Scala Question

Akka Stream : dynamically grow outlets during graph builder phase

I understand in Akka Stream one outlet must be connected to one inlet and there is no automatic support for connecting multiple sinks to the same source. So you have to insert intermediate objects such as


I am transforming a signal processing DAG into a Akka Stream graph, and it would help me a lot if I could dynamically add sinks to sources as they are discovered on the traversal. If I have a custom
, can I have my own
collection grows dynamically during the
phase? The normal DSL operation
is backed by this call:

b.addEdge(importAndGetPort(b), to)

How does the builder "get" the
here and would I be able to grow my shape on demand?

If this does not work, is it possible to "eject" a previous broadcast, disconnect its edges and wire them up with a new larger broadcast during the graph building?


The GraphDSL does not allow to mutate your shape dynamically.

However, since Akka 2.4.10 you can use BroadcastHub (and MergeHub).

BroadcastHub can give you a Sink that materializes into a Source. This Source can in turn be materialized as many times as needed to dynamically attach multiple subscribers.

So for a node of your DAG (e.g. having indegree=1 and outdegree=3), you could have something like

val hubSource = inEdgeSource.toMat(BroadcastHub.sink(bufferSize = ...))(Keep.right).run()

val nodeSink1 = hubSource.to(outEdgeSink1).run()
val nodeSink2 = hubSource.to(outEdgeSink2).run()
val nodeSink3 = hubSource.to(outEdgeSink3).run()

Akka docs: