theMadKing theMadKing - 2 months ago 13
Scala Question

Spark Streaming Update on Kafka Direct Stream parameter

I have the followingcode:

//Set basic spark parameters
val conf = new SparkConf()
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(2))

val messagesDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple4[String, Int, Long, String]](ssc, getKafkaBrokers, getKafkaTopics("processed"), (mmd: MessageAndMetadata[String, String]) => {
(mmd.topic, mmd.partition, mmd.offset, mmd.message().toString)

getKafkaBrokers and getKafkaTopics calls an API that checks the database for specific new Topics as we add them to our system. Does the SSC while running update variables each iteration? So ever messageDStream be re-created with the new variables each time?

It does not look like it does, is there any way to have the happen?


Tathagata Das, one of the creators of Spark Streaming answered a similar question in the Spark User List regarding modifications of existing DStreams.

Currently Spark Streaming does not support addition/deletion/modification of DStream after the streaming context has been started. Nor can you restart a stopped streaming context. Also, multiple spark contexts (and therefore multiple streaming contexts) cannot be run concurrently in the same JVM.

I don't see a straight forward way of implementing this with Spark Streaming, as you have no way of updating your graph. You need much more control than currently available. Maybe a solution based on Reactive Kafka, the Akka Streams connector for Kafka. Or any other streaming based solution where you control the source.