ike_love ike_love - 3 months ago 172
Scala Question

How to use Spark Structured Streaming with Kafka Direct Stream?

I came across Structured Streaming with Spark, it has an example of continuously consuming from an S3 bucket and writing processed results to a MySQL DB.

// Read data continuously from an S3 location
val inputDF = spark.readStream.json("s3://logs")

// Do operations using the standard DataFrame API and write to MySQL
inputDF.groupBy($"action", window($"time", "1 hour")).count()
.writeStream.format("jdbc")
.start("jdbc:mysql//...")


How can this be used with Spark Kafka Streaming?

val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)


Is there a way to combine these two examples without using
stream.foreachRDD(rdd => {})
?

Answer

Is there a way to combine these two examples without using stream.foreachRDD(rdd => {})?

Not yet. Spark 2.0.0 doesn't have Kafka sink support for Structured Streaming. This is a feature that should come out in Spark 2.1.0 according to Tathagata Das, one of the creators of Spark Streaming.