Rabzu Rabzu - 1 year ago 164
Scala Question

Chain Akka Streams Kafka with Akka-http by Source[Bytestring]

I'm receiving a file as a Bytestring from Kafka Reactive Streams consumer, which I want to send to a service.

Is there a away to extract Source[Bytestring, Any] from Kafka Reactive Stream Consumer, so that I can chain the stream from Kafka to Akka-http, without loading an entire Bytestring in memory and then doing an akka-http request?

Answer Source

The entity of an HttpRequest can be derived from a Source[ByteString,_], all that is really necessary is to convert your ByteString values to ChunkStreamPart values:

val kafkaSource : Source[ByteString, _] = ???

import akka.http.scaladsl.model.HttpEntity.{Chunked, ChunkStreamPart}

val byteStrToChunk = Flow[ByteString].map(ChunkStreamPart.apply)

val chunkSource : Source[ChunkStreamPart, _] = kafkaSource via byteStrToChunk

//the content type of your ByteString, e.g. ContentTypes.`application/json`
val contentType = ??? 

val entity = new Chunked(contentType, chunkSource)

val uri = ??? //service uri

val request = HttpRequest(method=PUT, uri=uri,entity=entity)

This pattern also meets your requirement of "without loading an entire Bytestring in memory". The Source is never consumed by your application's memory. Rather, your app is simply passing data from kafka along to service as demand is sent from the service. A truly reactive paradigm...

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download