Christopher Chiche Christopher Chiche - 3 years ago 139
Scala Question

Kafka topic to websocket

I am trying to implement a setup where I have multiple web browsers open a websocket connection to my akka-http server in order to read all messages posted to a kafka topic.

so the stream of messages should go this way

kafka topic -> akka-http -> websocket connection 1
-> websocket connection 2
-> websocket connection 3

For now I have created a path for the websocket:

val route: Route =
path("ws") {

Then I have created a consumer for my kafka topic:

val consumerSettings = ConsumerSettings(system,
new ByteArrayDeserializer, new StringDeserializer)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val source = Consumer
.plainSource(consumerSettings, Subscriptions.topics("topic1"))

And then finally I want to connect this source to the websocket in handleWebSocketMessages

def handleWebSocketMessages: Flow[Message, Message, Any] =
Flow[Message].mapConcat {
case tm: TextMessage =>
case bm: BinaryMessage =>
// ignore binary messages but drain content to avoid the stream being clogged

Here is the error I get when I try to use
in the TextMessage:

Error:(77, 9) overloaded method value apply with alternatives:
(text: String)
cannot be applied to ([org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],String],akka.kafka.scaladsl.Consumer.Control])

I think I'm making numerous mistakes along the way but I would say that the most blocking part is the

Answer Source

The first thing, is to understand that source is of type : Source[ConsumerRecord[K, V], Control]. So, it's not something that you could pass as an argument of a TextMessage.

If know, we are in the point of view of the WS:

  • An outgoing message is built for each message in the Kafka source. The message will be a TextMessage from a String transformation of the Kafka message.
  • For each incoming message, just println() it

So , the Flow can be seen as two components: the Source & the Sink.

val incomingMessages: Sink[Message, NotUsed] =

val outgoingMessages: Source[Message, NotUsed] =
    .map { consumerRecord => TextMessage(consumerRecord.getkey.toString) }

val handleWebSocketMessages: Flow[Message, Message, Any]  
  = Flow.fromSinkAndSource(incomingMessages, outgoingMessages)

Hope it helps.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download