Poliana Soares Poliana Soares - 2 months ago 14
Scala Question

Calcultate average by keys in Spark

I'm trying to calculate the average of a data set, the data used for my test is:

val arr = Array(("D1", List(("k1",100.10,4), ("k2",50.5,3))),
("D2", List(("k1",230.24,7), ("k3",157.2,5))),
("D3", List(("k2",120,6), ("k4",340.8,16))))


The operations done until now:

val s1 = sc.parallelize(arr.toSeq).flatMap { x => x._2.groupBy(_._1)}
val s2 = s1.map {
case (k, v) => (v(0)._1, (v(0)._2, v(0)._3))
}
val s3 = s2.groupByKey()


This s3 is
org.apache.spark.rdd.RDD[(String, Iterable[(AnyVal, Int)])]


(k3,CompactBuffer((157.2,5)))
(k4,CompactBuffer((340.8,16)))
(k2,CompactBuffer((50.5,3), (120,6)))
(k1,CompactBuffer((100.1,4), (230.24,7)))


And now I'd like to do an operation so that the result would be:

(k3, ( 157.2 / 5)
(k4, ( 340.8 / 16))
(k2, ( (50.5 + 120) / (3 + 6) ))
(k1, ( (100.1 + 230.24) / (4 + 7) ))


Really I'm a confused about that. How I can get this result?

Answer

First of all, you should remove sc.parallelize(arr.toSeq) and just do sc.parallelize(arr). Also when dealing with tuples... try to use pattern matching to keep things rational.

Also... from the looks of it, you want to take average on using t._1 for every tuple inside the List as key for aggregation. In this case you do not need any groupBy operation

val arr = Array(
  ("D1", List(("k1",100.10,4), ("k2",50.5,3))),
  ("D2", List(("k1",230.24,7), ("k3",157.2,5))),
  ("D3", List(("k2",120,6), ("k4",340.8,16)))
)

// RDD[(String, List[(String, Float, Int)])   
val s = sc.parallelize(arr)

// RDD[List[(String, (Float, Int))]
val s2 = s.flatMap({
  case (id, list) => list.map({
    case (key, f1, i1) => (key, (f1, i1))
  })
})

// you do not need s3 at all
// `groupByKey` in Spark is very costly.
// you already have a PairRDD in s2 with key -> String and val -> (Float, Int)
// just go ahead and aggregate them 

// RDD[(String,(Float, Int))]
val s4 = s2
  .aggregateByKey((Float(0.0), Int(0))(
    { case ((total, count), (f1, i1)) => (total + f1, count + i1) },
    { case ((total1, count1), (total2, count2)) => (total + total2, count1 + count2) }
  )

// RDD[(String,Float)]
val s5 = s4.map({ (key,(total, count)) => (key, total / count) })
Comments