salyh salyh - 1 year ago 190
Scala Question

Consume TCP stream and redirect it to another Sink (with Akka Streams)

I try to redirect/forward a TCP stream to another Sink with Akka 2.4.3.
The program should open a server socket, listen for incoming connections and then consume the tcp stream. Our sender does not expect/accept replies from us so we never send back anything - we just consume the stream. After framing the tcp stream we need to transform the bytes into something more useful and send it to the Sink.

I tried the following so far but i struggle especially with the part how to not sending tcp packets back to the sender and to properly connect the Sink.

import scala.util.Failure
import scala.util.Success

import akka.event.Logging
import akka.util.ByteString
import java.nio.ByteOrder

object TcpConsumeOnlyStreamToSink {
implicit val system = ActorSystem("stream-system")
private val log = Logging(system, getClass.getName)

//The Sink
//In reality this is of course a real Sink doing some useful things :-)
//The Sink accept types of "SomethingMySinkUnderstand"
val mySink = Sink.ignore;

def main(args: Array[String]): Unit = {
//our sender is not interested in getting replies from us
//so we just want to consume the tcp stream and never send back anything to the sender
val (address, port) = ("", 6000)
server(system, address, port)

def server(system: ActorSystem, address: String, port: Int): Unit = {
implicit val sys = system
import system.dispatcher
implicit val materializer = ActorMaterializer()
val handler = Sink.foreach[Tcp.IncomingConnection] { conn =>
println("Client connected from: " + conn.remoteAddress)

conn handleWith Flow[ByteString]
//this is neccessary since we use a self developed tcp wire protocol
.via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN))
//here we want to map the raw bytes into something our Sink understands
.map(msg => new SomethingMySinkUnderstand(msg.utf8String))
//here we like to connect our Sink to the Tcp Source
.to(mySink) //<------ NOT COMPILING

val tcpSource = Tcp().bind(address, port)
val binding =

binding.onComplete {
case Success(b) =>
println("Server started, listening on: " + b.localAddress)
case Failure(e) =>
println(s"Server could not bind to $address:$port: ${e.getMessage}")


class SomethingMySinkUnderstand(x:String) {


Update: Add this to your build.sbt file to get necessary deps

libraryDependencies += "com.typesafe.akka" % "akka-stream_2.11" % "2.4.3"

Answer Source

handleWith expects a Flow, i.e. a box with an unconnected inlet and an unconnected outlet. You effectively provide a Source, because you connected the Flow with a Sink by using the to operation.

I think you could do the following:

conn.handleWith( Flow[ByteString] .via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN)) .map(msg => new SomethingMySinkUnderstand(msg.utf8String)) .alsoTo(mySink) .map(_ => ByteString.empty) .filter(_ => false) // Prevents sending anything back )