wojciechka wojciechka - 13 days ago 6
Scala Question

reading TCP as client via akka-stream

I'm new to akka-stream and attempting to use the library as a client to read (and only read) from a remote TCP server socket.

My attempts however fail as no incoming ByteStrings are being processed.

neither:

Source.empty[ByteString]
.via(Tcp().outgoingConnection(address, port))
.to(Sink.foreach(println(_))).run()


nor:

val connection = Tcp().outgoingConnection(address, port)
val sink = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.to(Sink.foreach(println(_)))
connection.
runWith(Source.empty, sink)


works.

What are the reasons for the flows not being run? All hints much appreciated.

Answer

As @ViktorKlang mentioned, the empty Source on the client side is causing an issue where the flow essentially won't be run. You can setup the pipeline to start receiving only by using a single empty ByteString Source instead of an empty one as the example below demonstrates:

implicit val system = ActorSystem()
implicit val mater = ActorMaterializer()
val server = Tcp().bind("127.0.0.1", 2555)
server.runForeach{ conn =>
  Source.tick(1 second, 1 second, "foo")        
    .map(f => ByteString(f))
    .via(conn.flow)
    .to(Sink.ignore).run
}

val clientFlow = Tcp().outgoingConnection(new InetSocketAddress("127.0.0.1", 2555))
clientFlow.runWith(Source(List(ByteString.empty)), Sink.foreach(bs => println("client received: " + bs.utf8String)))

If you do this, then you will start receiving data from the server every second in accordance with the tick source created on the server side.