Poliana Soares - 2 months ago 14
Scala Question

# Calcultate average by keys in Spark

I'm trying to calculate the average of a data set, the data used for my test is:

``````val arr = Array(("D1", List(("k1",100.10,4), ("k2",50.5,3))),
("D2", List(("k1",230.24,7), ("k3",157.2,5))),
("D3", List(("k2",120,6), ("k4",340.8,16))))
``````

The operations done until now:

``````val s1 = sc.parallelize(arr.toSeq).flatMap { x => x._2.groupBy(_._1)}
val s2 = s1.map {
case (k, v) => (v(0)._1, (v(0)._2, v(0)._3))
}
val s3 = s2.groupByKey()
``````

This s3 is
`org.apache.spark.rdd.RDD[(String, Iterable[(AnyVal, Int)])]`

``````(k3,CompactBuffer((157.2,5)))
(k4,CompactBuffer((340.8,16)))
(k2,CompactBuffer((50.5,3), (120,6)))
(k1,CompactBuffer((100.1,4), (230.24,7)))
``````

And now I'd like to do an operation so that the result would be:

``````(k3, ( 157.2 / 5)
(k4, ( 340.8 / 16))
(k2, ( (50.5 + 120) / (3 + 6) ))
(k1, ( (100.1 + 230.24) / (4 + 7) ))
``````

Really I'm a confused about that. How I can get this result?

First of all, you should remove `sc.parallelize(arr.toSeq)` and just do `sc.parallelize(arr)`. Also when dealing with tuples... try to use pattern matching to keep things rational.

Also... from the looks of it, you want to take average on using `t._1` for every tuple inside the `List` as key for aggregation. In this case you do not need any `groupBy` operation

``````val arr = Array(
("D1", List(("k1",100.10,4), ("k2",50.5,3))),
("D2", List(("k1",230.24,7), ("k3",157.2,5))),
("D3", List(("k2",120,6), ("k4",340.8,16)))
)

// RDD[(String, List[(String, Float, Int)])
val s = sc.parallelize(arr)

// RDD[List[(String, (Float, Int))]
val s2 = s.flatMap({
case (id, list) => list.map({
case (key, f1, i1) => (key, (f1, i1))
})
})

// you do not need s3 at all
// `groupByKey` in Spark is very costly.
// you already have a PairRDD in s2 with key -> String and val -> (Float, Int)
// just go ahead and aggregate them

// RDD[(String,(Float, Int))]
val s4 = s2
.aggregateByKey((Float(0.0), Int(0))(
{ case ((total, count), (f1, i1)) => (total + f1, count + i1) },
{ case ((total1, count1), (total2, count2)) => (total + total2, count1 + count2) }
)

// RDD[(String,Float)]
val s5 = s4.map({ (key,(total, count)) => (key, total / count) })
``````
Source (Stackoverflow)