tamersalama tamersalama - 10 days ago 5
Scala Question

Spark reduceByKey reduce only on certain conditions

I'm trying to reduce timeseries data to collect result happening within 1 hour into an array (for detecting the max, min, avg).

It doesn't look I'm able to provide conditions within the reduce block that'd determine if a reduction should occur (values added to the array), or reduction skipped.

//data
//ID, VAL, DATETIME
tvFile.map((x) =>
(x.split(',')(0), (Array(x.split(',')(1)), Array(x.split(',')(2))))) //(ID, ([VAL], [DATETIME])
.reduceByKey((a,b) => {
val dt1 = DateTime.parse(a._2(0))
val dt2 = DateTime.parse(b._2(0))
if ((dt1.getDayOfYear == dt2.getDayOfYear) && (dt1.getHourOfDay == dt2.getHourOfDay))
(a._1 ++ b._1, a._2 ++ b._2)
else
// NOT SURE WHAT TO DO HERE
}).collect


The above is not the most efficient/correct / I'm starting out with Spark/Scala.

Answer

The approach should be to prepare the data in order to have a key that partitions the data for the aggregation. Following the code in the question, in this case the key should be (id, day-of-year, hr-of-day)

Once the data is prepared correctly, the aggregation is trivial.

Example:

val sampleData =  Seq("p1,38.1,2016-11-26T11:15:10",
                      "p1,39.1,2016-11-26T11:16:10",
                      "p1,35.8,2016-11-26T11:17:10",
                      "p1,34.1,2016-11-26T11:18:10",
                      "p2,37.2,2016-11-26T11:16:00",
                      "p2,31.2,2016-11-27T11:17:00",
                      "p2,31.6,2016-11-27T11:17:00",
                      "p1,39.4,2016-11-26T12:15:10",
                      "p2,36.3,2016-11-27T10:10:10",
                      "p1,39.5,2016-11-27T12:15:00",
                      "p3,36.1,2016-11-26T11:15:10") 

val sampleDataRdd = sparkContext.parallelize(sampleData)                          

val records = sampleDataRdd.map{line => 
                            val parts = line.split(",")
                            val id = parts(0)
                            val value = parts(1).toDouble
                            val dateTime = DateTime.parse(parts(2))
                            val doy = dateTime.getDayOfYear
                            val hod = dateTime.getHourOfDay
                            ((id, doy, hod), value)
                           }

val aggregatedRecords = records.reduceByKey(_ + _)                               
aggregatedRecords.collect
// Array[((String, Int, Int), Double)] = Array(((p1,331,11),147.10000000000002), ((p2,332,11),62.8), ((p2,331,11),37.2), ((p1,332,12),39.5), ((p2,332,10),36.3), ((p1,331,12),39.4), ((p3,331,11),36.1))  

This is also a lot easier with Spark DataFrames. Answered using the RDD API as it's how's asked in the question.