Knows Not Much Knows Not Much - 2 years ago 227
Scala Question

Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds

I am trying to write a simple consumer of messages from kafka using akka streams.

build.sbt

"com.typesafe.akka" %% "akka-stream-kafka" % "0.17"


My code

object AkkaStreamskafka extends App {

// producer settings
implicit val system = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()
val consumerSettings = ConsumerSettings(system, Some(new ByteArrayDeserializer), Some(new StringDeserializer))
.withBootstrapServers("foo:9092")
.withGroupId("abhi")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

val source = Consumer
.committableSource(consumerSettings, Subscriptions.topics("my-topic))
val flow = Flow[ConsumerMessage.CommittableMessage[Array[Byte], String]].mapAsync(1){msg =>
msg.committableOffset.commitScaladsl().map(_ => msg.record.value);
}
val sink = Sink.foreach[String](println)
val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){implicit builder =>
s =>
import GraphDSL.Implicits._
source ~> flow ~> s.in
ClosedShape
})
val future = graph.run()
Await.result(future, Duration.Inf)
}


But I get an error

[WARN] [09/28/2017 13:12:52.333] [default-akka.kafka.default-dispatcher-7]
[akka://default/system/kafka-consumer-1] Consumer interrupted with WakeupException after timeout.
Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds


Edit:

I can do a
ssh foo
and then type the following command on the server terminal
./kafka-console-consumer --zookeeper localhost:2181 --topic my-topic
and I can see data. So I guess my server name
foo
is correct and kafka is up and running on that machine.

Edit2:

On the Kafka Server I am running Cloudera 5.7.1. Kafka version is jars/kafka_2.10-0.9.0-kafka-2.0.0.jar

Answer Source

I was able to solve the problem myself.

The library "com.typesafe.akka" %% "akka-stream-kafka" only works for Kafka 0.10 and beyond. it does not work for earlier versions of Kafka. When I listed the kafka jars on my kafka server I found that I am using Cloudera 5.7.1 which comes with Kafka 0.9.

In order to create a Akka Streams source for this version. I needed to use

"com.softwaremill.reactivekafka" % "reactive-kafka-core_2.11" % "0.10.0"

They also have an example there https://github.com/kciesielski/reactive-kafka

This code worked perfectly for me

implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()
val kafka = new ReactiveKafka()
val consumerProperties = ConsumerProperties(
  bootstrapServers = "foo:9092",
  topic = "my-topic",
  groupId = "abhi",
  valueDeserializer = new StringDeserializer()
)

val source = Source.fromPublisher(kafka.consume(consumerProperties))
val flow = Flow[ConsumerRecord[Array[Byte], String]].map(r => r.value())
val sink = Sink.foreach[String](println)
val graph = RunnableGraph.fromGraph(GraphDSL.create(sink) {implicit builder =>
 s =>
   import GraphDSL.Implicits._
    source ~> flow ~> s.in
   ClosedShape
})
val future = graph.run()
future.onComplete{_ =>
  actorSystem.terminate()
}
Await.result(actorSystem.whenTerminated, Duration.Inf)
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download