Antonio Calì Antonio Calì - 7 months ago 79
Scala Question

How reduceByKey in RddPair<K,Tuple> in Scala

I have this a CassandraTable. Access by SparkContext.cassandraTable(). Retrieve all my CassandraRow.

Now I want to store 3 information: (user, city, byte)
I store like this =>

I obtain a RDD[(String, Iterable[(String, Long)])]
Now for each user I want to sum all bytes and create a Map for city like: "city"->"occurencies" (how many time this city appairs for this user).

Previously, I split up this code in two differnt RDD, one to sum byte, the other one to create map as described.

Example for occurency for City => (user._1, user._2.size, user._2.groupBy(identity).map(city => (city._1,city._2.size))))

that's because I could access to second element of my tuple thanks to ._2 method. But now?
My second element is a Iterable[(String,Long)], and I can't map anymore like I did before.

Is there a solution to retrieve all my information with just one rdd and a single MapReduce?


You could do this easily by first grouping bytes and city occurrence for user,city and then do a group by user

val data = Array(("user1","city1",100),("user1","city1",100),
val rdd = sc.parallelize(data)

val res => ((x._1,x._2),(1,x._3)))
             .reduceByKey((x,y)=> (x._1+y._1,x._2+y._2))
             .map(x => (x._1._1,(x._1._2,x._2._1,x._2._2)))
val userCityUsageRdd = => { 
 val m = x._2.toList
 (x._1 , => (y._1,y._2)), => x._3).reduce(_+_))


res20: Array[(String, List[(String, Int)], Int)] = 
 Array((user1,List((city1,3), (city3,1), (city2,3)),700), 
       (user2,List((city1,1), (city2,1)),200))