smeeb smeeb - 4 months ago 53
Scala Question

Using Spark StreamingContext to Consume from Kafka topic

I am brand new to Spark & Kafka and am trying to get some Scala code (running as a Spark job) to act as a long-running process (not just a short-lived/scheduled task) and to continuously poll a Kafka broker for messages. When it receives messages, I just want them printed out to the console/STDOUT. Again, this needs to be a long-running process and basically (try to) live forever.

After doing some digging, it seems like a

StreamingContext
is what I want to use. Here's my best attempt:

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.storage._
import org.apache.spark.streaming.{StreamingContext, Seconds, Minutes, Time}
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder

def createKafkaStream(ssc: StreamingContext, kafkaTopics: String, brokers: String): DStream[(String, String)] = {
val topicsSet = kafkaTopics.split(",").toSet
val props = Map(
"bootstrap.servers" -> "my-kafka.example.com:9092",
"metadata.broker.list" -> "my-kafka.example.com:9092",
"serializer.class" -> "kafka.serializer.StringEncoder",
"value.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
)
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, props, topicsSet)
}

def processEngine(): StreamingContext = {
val ssc = new StreamingContext(sc, Seconds(1))

val topicStream = createKafkaStream(ssc, "mytopic", "my-kafka.example.com:9092").print()

ssc
}

StreamingContext.getActive.foreach {
_.stop(stopSparkContext = false)
}

val ssc1 = StreamingContext.getActiveOrCreate(processEngine)
ssc1.start()
ssc1.awaitTermination()


When I run this, I get no exceptions/errors, but nothing seems to happen. I can confirm there are messages on the topic. Any ideas as to where I'm going awry?

Answer

When you foreachRDD, the output is printed in the Worker nodes, not the Master. I'm assuming you're looking at the Master's console output. You can use DStream.print instead:

val ssc = new StreamingContext(sc, Seconds(1))
val topicStream = createKafkaStream(ssc, "mytopic", "my-kafka.example.com:9092").print()

Also, don't forget to call ssc.awaitTermination() after ssc.start():

ssc.start()
ssc.awaitTermination()

As a sidenote, I'm assuming you copy pasted this example, but there's no need to use transform on the DStream if you're not actually planning to do anything with the OffsetRange.