Ahmed El-Gamal Ahmed El-Gamal - 4 months ago 15
Scala Question

Sorting a DStream and taking topN

I have some DStream in Spark Scala and I want to sort it then take the top N.
The problem is that whenever I try to run it I get

NotSerializableException
and the exception message says:


This is because the DStream object is being referred to from within the closure.


The problem is that I don't know how to solve it:

Here is my try:

package com.badrit.realtime

import java.util.Date

import com.badrit.drivers.UnlimitedSpaceTimeDriver
import com.badrit.model.{CellBuilder, DataReader, Trip}
import com.badrit.utility.Printer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Duration, Milliseconds, StreamingContext}

import scala.collection.mutable

object StreamingDriver {
val appName: String = "HotSpotRealTime"
val hostName = "localhost"
val port = 5050
val constrains = UnlimitedSpaceTimeDriver.constrains;
var streamingRate = 1;
var windowSize = 8;
var slidingInterval = 2;
val cellBuilder = new CellBuilder(constrains)
val inputFilePath = "/home/ahmedelgamal/Downloads/green_tripdata_2015-02.csv"

def prepareTestData(sparkStreamCtx: StreamingContext): InputDStream[Trip] = {

val sparkCtx = sparkStreamCtx.sparkContext
val textFile: RDD[String] = sparkCtx.textFile(inputFilePath)
val data: RDD[Trip] = new DataReader().getTrips(textFile)
val groupedData = data.filter(_.pickup.date.before(new Date(2015, 1, 2, 0, 0, 0)))
.groupBy(trip => trip.pickup.date.getMinutes).sortBy(_._1).map(_._2).collect()

printf("Grouped Data Count is " + groupedData.length)
var dataQueue: mutable.Queue[RDD[Trip]] = mutable.Queue.empty;

groupedData.foreach(trips => dataQueue += sparkCtx.makeRDD(trips.toArray))
printf("\n\nTest Queue size is " + dataQueue.size)


groupedData.zipWithIndex.foreach { case (trips: Iterable[Trip], index: Int) => {
println("Items List " + index)


val passengers: Array[Int] = trips.map(_.passengers).toArray
val cnt = passengers.length
println("Sum is " + passengers.sum)
println("Cnt is " + cnt)

val passengersRdd = sparkCtx.parallelize(passengers)
println("Mean " + passengersRdd.mean())
println("Stdv" + passengersRdd.stdev())

}
}
sparkStreamCtx.queueStream(dataQueue, true)
}


def cellCreator(trip: Trip) = cellBuilder.cellForCarStop(trip.pickup)

def main(args: Array[String]) {
if (args.length < 1) {
streamingRate = 1;
windowSize = 3 //2 hours 60 * 60 * 1000L
slidingInterval = 2 //0.5 hour 60 * 60 * 1000L
}
else {
streamingRate = args(0).toInt;
windowSize = args(1).toInt
slidingInterval = args(2).toInt
}

val sparkConf = new SparkConf().setAppName(appName).setMaster("local[*]")
val sparkStreamCtx = new StreamingContext(sparkConf, Milliseconds(streamingRate))
sparkStreamCtx.sparkContext.setLogLevel("ERROR")
sparkStreamCtx.checkpoint("/tmp")

val data: InputDStream[Trip] = prepareTestData(sparkStreamCtx)
val dataWindow = data.window(new Duration(windowSize), new Duration(slidingInterval))

//my main problem lies in the following line
val newDataWindow = dataWindow.transform(rdd => sparkStreamCtx.sparkContext.parallelize(rdd.take(10)))
newDataWindow.print

sparkStreamCtx.start()
sparkStreamCtx.awaitTerminationOrTimeout(1000)

}
}


I don't mind any other ways to sort a DStream and get its top N rather than my way.

Answer

You can use transform method in the DStream object then sort the input RDD and take n elements of it in a list, then filter the original RDD to be contained in this list.

val n = 10
val topN = result.transform(rdd =>{
   val list = rdd.sortBy(_._1).take(n)
   rdd.filter(list.contains)
})
topN.print