Christopher Chiche Christopher Chiche - 3 years ago 139
Scala Question

Kafka topic to websocket

I am trying to implement a setup where I have multiple web browsers open a websocket connection to my akka-http server in order to read all messages posted to a kafka topic.

so the stream of messages should go this way

kafka topic -> akka-http -> websocket connection 1
-> websocket connection 2
-> websocket connection 3


For now I have created a path for the websocket:

val route: Route =
path("ws") {
handleWebSocketMessages(notificationWs)
}


Then I have created a consumer for my kafka topic:

val consumerSettings = ConsumerSettings(system,
new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val source = Consumer
.plainSource(consumerSettings, Subscriptions.topics("topic1"))


And then finally I want to connect this source to the websocket in handleWebSocketMessages

def handleWebSocketMessages: Flow[Message, Message, Any] =
Flow[Message].mapConcat {
case tm: TextMessage =>
TextMessage(source)::Nil
case bm: BinaryMessage =>
// ignore binary messages but drain content to avoid the stream being clogged
bm.dataStream.runWith(Sink.ignore)
Nil
}


Here is the error I get when I try to use
source
in the TextMessage:


Error:(77, 9) overloaded method value apply with alternatives:
(textStream: akka.stream.scaladsl.Source[String,Any])akka.http.scaladsl.model.ws.TextMessage
(text: String)akka.http.scaladsl.model.ws.TextMessage.Strict
cannot be applied to (akka.stream.scaladsl.Source[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],String],akka.kafka.scaladsl.Consumer.Control])
TextMessage(source)::Nil


I think I'm making numerous mistakes along the way but I would say that the most blocking part is the
handleWebSocketMessages
.

Answer Source

The first thing, is to understand that source is of type : Source[ConsumerRecord[K, V], Control]. So, it's not something that you could pass as an argument of a TextMessage.

If know, we are in the point of view of the WS:

  • An outgoing message is built for each message in the Kafka source. The message will be a TextMessage from a String transformation of the Kafka message.
  • For each incoming message, just println() it

So , the Flow can be seen as two components: the Source & the Sink.

val incomingMessages: Sink[Message, NotUsed] =
  Sink.foreach(println(_))

val outgoingMessages: Source[Message, NotUsed] =
  source
    .map { consumerRecord => TextMessage(consumerRecord.getkey.toString) }

val handleWebSocketMessages: Flow[Message, Message, Any]  
  = Flow.fromSinkAndSource(incomingMessages, outgoingMessages)

Hope it helps.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download