Юлия Вовк Юлия Вовк - 1 year ago 128
Scala Question

saving kafka messages consumed by spark streaming in zeppelin notebook

I have problem saving kafka messages consumed by spark streaming in zeppelin notebook.

My code is:

case class Message(id: Long, message: String, timestamp: Long) extends Serializable

val ssc = new StreamingContext(sc, Seconds(2))

val messagesStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc,
Map("zookeeper.connect" -> "localhost:2181", "group.id" -> "test-consumer-group"),
Map("test" -> 4),
.map { case (k, v) => implicit val formats = DefaultFormats; JsonMethods.parse(v).extract[Message] }
.filter(_.id % 2 == 0)

val mes = messagesStream.window(Seconds(10))

.map(m => Message(m.id, m.message, m.timestamp))
.foreachRDD( rdd => rdd.toDF.registerTempTable("messages"))


When i run
%sql select * from messages
it shows no data, but the table is defined. If i change saving to tempTable on Cassandra it saves and shows data correctly. Don't understand why it is so.

Thanks for help.

Answer Source

Ok here is the problem. Let's first review the foreachRDD operator definition :

foreachRDD is not used how it's intended to be used. It is the most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.

So what's actually happening with your code is the following :

Since DStreams are executed lazily by the output operations, just like RDDs are lazily executed by RDD actions. Specifically, RDD actions inside the DStream output operations force the processing of the received data. Hence, if your application does not have any output operation, which you don't, or has output operations like dstream.foreachRDD() without any RDD action inside them, then nothing will get executed. The system will simply receive the data and discard it.

So your RDD data are discarded each time you perform registerTempTable and so your SQL query gives an empty result.

To solve your problem, you'll need to save your data somewhere (Cassandra is a good choice) then query on it.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download