tamersalama tamersalama - 1 year ago 91
Scala Question

Spark reduceByKey reduce only on certain conditions

I'm trying to reduce timeseries data to collect result happening within 1 hour into an array (for detecting the max, min, avg).

It doesn't look I'm able to provide conditions within the reduce block that'd determine if a reduction should occur (values added to the array), or reduction skipped.

tvFile.map((x) =>
(x.split(',')(0), (Array(x.split(',')(1)), Array(x.split(',')(2))))) //(ID, ([VAL], [DATETIME])
.reduceByKey((a,b) => {
val dt1 = DateTime.parse(a._2(0))
val dt2 = DateTime.parse(b._2(0))
if ((dt1.getDayOfYear == dt2.getDayOfYear) && (dt1.getHourOfDay == dt2.getHourOfDay))
(a._1 ++ b._1, a._2 ++ b._2)

The above is not the most efficient/correct / I'm starting out with Spark/Scala.

Answer Source

The approach should be to prepare the data in order to have a key that partitions the data for the aggregation. Following the code in the question, in this case the key should be (id, day-of-year, hr-of-day)

Once the data is prepared correctly, the aggregation is trivial.


val sampleData =  Seq("p1,38.1,2016-11-26T11:15:10",

val sampleDataRdd = sparkContext.parallelize(sampleData)                          

val records = sampleDataRdd.map{line => 
                            val parts = line.split(",")
                            val id = parts(0)
                            val value = parts(1).toDouble
                            val dateTime = DateTime.parse(parts(2))
                            val doy = dateTime.getDayOfYear
                            val hod = dateTime.getHourOfDay
                            ((id, doy, hod), value)

val aggregatedRecords = records.reduceByKey(_ + _)                               
// Array[((String, Int, Int), Double)] = Array(((p1,331,11),147.10000000000002), ((p2,332,11),62.8), ((p2,331,11),37.2), ((p1,332,12),39.5), ((p2,332,10),36.3), ((p1,331,12),39.4), ((p3,331,11),36.1))  

This is also a lot easier with Spark DataFrames. Answered using the RDD API as it's how's asked in the question.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download