Vincent Roma Vincent Roma - 1 year ago 78
Scala Question

How to use ReduceByKey on multiple key in a Scala Spark Job

I'm relatively new to spark and I'm trying to group data by multiple keys at the same time.

I have some data that I map so it ends up looking like this:


My goal is to group by (K1,K2,K3) and respectively sum V1 and V2 to end up with:

((K1,K2,K3), (SUM(V1),SUM(V2))

Here is the code I have so far:

val filepath = "file.avro"
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
val data =
val dataRDD = data.rdd

val mappedDataRDD ={
case (v, w, x, y, z) => ((v,w,x), (y, z))
}.reduceByKey((x,y)=> ???)

So I'm looking for how to reduceByKey so I can group by the (v,w,x) keys and sum the y and z.

Answer Source

I think what you are looking for and should use is aggregateByKey.

This method takes two parameter groups. The first parameter group takes the starting value of accumulator. The second parameter group takes two functions,

  1. A function which accumulates things into accumulator.
  2. A functions which combines two accumulators.

Now you can use it as follows,

val (accZeroY, accZeroZ): (Long, Long) = (0, 0) 

val mappedDataRDD = dataRDD
    case (v, w, x, y, z) => ((v,w,x), (y, z))
  .aggregateByKey((accZeroY, accZeroZ))(
    { case ((accY, accZ), (y, z)) =>  (accY + y, accZ + z) }
    { case ((accY1, accZ1), (accY2, accZ2)) => (accY1 + accY2, accZ1 + accZ2) }

As you should have observed that both the functions in the second parameter group are actually same in this case. Which is possible only in the cases when the type of the needed accumulation is same as the type of value in key-value-RDD or PairRDD.

In such cases you can also use reduceByKey which you can think of as an aggregateByKey with the same function passed as both function parameters,

val mappedDataRDD = dataRDD
    case (v, w, x, y, z) => ((v,w,x), (y, z))
    { case ((accY, accZ), (y, z)) =>  (accY + y, accZ + z) }

But in my opinion, you should NOT use reduceBykey. The reason that I suggested the use of aggregateByKey is because accumulation of values on large datasets can sometime produce a result which is outside the range of your type.

For example in your case, I suspect that your (x, y) is actually an (Int, Int) and you want to accumulate it using (v, w, x) as key. But whenever you are adding Int in large amount... remember that the result can end up bigger than what an Int can handle.

So... you will want the type of your accumulation to be something with bigger range that (Int, Int) like (Long, Long) and reduceByKey does not allow you to do that. And so... I will say that perhaps you are looking for and should use aggregateByKey

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download