nilesh1212 nilesh1212 - 16 days ago 7
Scala Question

Spark Streaming Windowing output

I am working on a spark streaming application, where I need to print min, max values of a json attribute,that attribute should print min, max on window of every 20 sec with the sliding window of 2 seconds.
Basically (for POC) I want to print min, max on the Spark UI of job group sparkContext.

SetJobGroup ("count-min-max", "count-min-max value of quality attribute").


This should display every 20 Sec on Spark UI display.

Below is my code I able to get min, max, count, but print is executed every 2 sec which is streaming batch interval not on window of 20 sec.

val ssc = new StreamingContext(sparkContext, Seconds(2))

val record = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_ONLY_SER_2)
val lines=record.map(_._2)

//val jsonCounts=lines.map { jsonRecord => parseJson(jsonRecord) }.map { x => x.mkString("\n") }.print

val valueDtsream:DStream[Array[Double]]=lines.map { jsonRecord => parseJson(jsonRecord) }
.window(Seconds(20),Seconds(2))

valueDtsream.foreachRDD
{
rdd =>
if (!rdd.partitions.isEmpty)
{
val stats = rdd.flatMap(x => x)
println(stats.count().toString()+"-"+stats.min().toString()+"-"+stats.max().toString)
}
}

ssc.start()
ssc.awaitTermination()

Answer

I think you are confused between slideInterval and windowLength. In window(windowLength, slideInterval):

  1. windowLength is the length of the window, meaning how many intervals of data the window should consider for the computation.
  2. slideInterval is how many intervals the window moves after a window computation is completed.

If i understand your question correctly, you should edit it as : .window(Seconds(x),Seconds(20)).