cmbaxter cmbaxter - 1 month ago 18
Scala Question

How to split an inbound stream on a delimiter character using Akka Streams

I've been playing around with the experimental Akka Streams API a bit and I have a use case that I wanted to see how to implement. For my use case, I have a

StreamTcp
based
Flow
that is being fed from binding the input stream of connections to my server socket. The Flow that I have is based on
ByteString
data coming into it. The data that is coming in is going to have a delimiter in it that means I should treat everything before the delimiter as one message and everything after and up to the next delimiter as the next message. So playing around with a simpler example, using no sockets and just static text, this is what I came up with:

import akka.actor.ActorSystem
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.scaladsl.Flow
import scala.util.{ Failure, Success }
import akka.util.ByteString

object BasicTransformation {

def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("Sys")

val data = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.")

Flow(data).
splitWhen(c => c == '.').
foreach{producer =>
Flow(producer).
filter(c => c != '.').
fold(new StringBuilder)((sb, c) => sb.append(c.toChar)).
map(_.toString).
filter(!_.isEmpty).
foreach(println(_)).
consume(FlowMaterializer(MaterializerSettings()))
}.
onComplete(FlowMaterializer(MaterializerSettings())) {
case any =>
system.shutdown
}
}
}


The main function on the
Flow
that I found to accomplish my goal was
splitWhen
, which then produces additional sub-flows, one for each message per that
.
delimiter. I then process each sub-flow with another pipeline of steps, finally printing the individual messages at the end.

This all seems a bit verbose, to accomplish what I thought to be a pretty simple and common use case. So my question is, is there a cleaner and less verbose way to do this or is this the correct and preferred way to split a stream up by a delimiter?

Answer

After posting this same question on the the Akka User's Group, I got some suggestions from Endre Varga and Viktor Klang (https://groups.google.com/forum/#!topic/akka-user/YsnwIAjQ3EE). I ended up going with Endre's suggestion of a Transformer and then using the transform method on the Flow. A slightly modified version of my previous example is contained below:

import akka.actor.ActorSystem
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.scaladsl.Flow
import scala.util.{ Failure, Success }
import akka.util.ByteString
import akka.stream.Transformer
import akka.util.ByteStringBuilder

object BasicTransformation {

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("Sys")                           
    implicit val mater = FlowMaterializer(MaterializerSettings())

    val data = List(
      ByteString("Lorem Ipsum is"), 
      ByteString(" simply.Dummy text of.The prin"), 
      ByteString("ting.And typesetting industry.")
    )
    Flow(data).transform(new PeriodDelimitedTransformer).foreach(println(_))
  }
}

With the definition of PeriodDelimitedTransformer being the following:

class PeriodDelimitedTransformer extends Transformer[ByteString,String]{
  val buffer = new ByteStringBuilder

  def onNext(msg:ByteString) = {    
    val msgString = msg.utf8String
    val delimIndex = msgString.indexOf('.')
    if (delimIndex == -1){
      buffer.append(msg)
      List.empty
    }
    else{
      val parts = msgString.split("\\.")
      val endsWithDelim = msgString.endsWith(".")

      buffer.putBytes(parts.head.getBytes())
      val currentPiece = buffer.result.utf8String            
      val otherPieces = parts.tail.dropRight(1).toList

      buffer.clear
      val lastPart = 
      if (endsWithDelim){
          List(parts.last)
      }
      else{
          buffer.putBytes(parts.last.getBytes())
          List.empty
      }           


      val result = currentPiece :: otherPieces ::: lastPart
      result
    }

  }  
}

So some of the complexity of my previous solution is rolled up into this Transformer, but this seems like the best approach. In my initial solution, the stream ends up getting split into multiple sub-streams and that's not really what I wanted.

Comments