Alexander Temerev Alexander Temerev - 6 months ago 58
Scala Question

Akka Streams split stream by type

I have the following simple case class hierarchy:

sealed trait Message
case class Foo(bar: Int) extends Message
case class Baz(qux: String) extends Message

And I have a
Flow[Message, Message, NotUsed]
(from a Websocket-based protocol with codec already in place).

I want to demultiplex this
into separate flows for Foo and Baz types, as those are processed by completely different pathways.

What is the simplest way of doing that? Should be obvious, but I am missing something...


One way is to use create a RunnableGraph that includes the Flows for each type of message.

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>

  val in = Source(...)  // Some message source
  val out = Sink.ignore

  val foo = builder.add(Flow[Message].map (x => x match { case f@Foo(_) => f }))
  val baz = builder.add(Flow[Message].map (x => x match { case b@Baz(_) => b }))
  val partition = builder.add(Partition[Message](2, {
    case Foo(_) => 0
    case Baz(_) => 1

  partition ~> foo ~> // other Flow[Foo] here ~> out
  partition ~> baz ~> // other Flow[Baz] here ~> out