Antonio Calì Antonio Calì - 4 months ago 27
Scala Question

How reduceByKey in RddPair<K,Tuple> in Scala

I have this a CassandraTable. Access by SparkContext.cassandraTable(). Retrieve all my CassandraRow.

Now I want to store 3 information: (user, city, byte)
I store like this

rddUsersFilter.map(row =>
(row.getString("user"),(row.getString("city"),row.getString("byte").replace(",","").toLong))).groupByKey


I obtain a RDD[(String, Iterable[(String, Long)])]
Now for each user I want to sum all bytes and create a Map for city like: "city"->"occurencies" (how many time this city appairs for this user).

Previously, I split up this code in two differnt RDD, one to sum byte, the other one to create map as described.

Example for occurency for City

rddUsers.map(user => (user._1, user._2.size, user._2.groupBy(identity).map(city => (city._1,city._2.size))))


that's because I could access to second element of my tuple thanks to ._2 method. But now?
My second element is a Iterable[(String,Long)], and I can't map anymore like I did before.

Is there a solution to retrieve all my information with just one rdd and a single MapReduce?

Answer

You could do this easily by first grouping bytes and city occurrence for user,city and then do a group by user

val data = Array(("user1","city1",100),("user1","city1",100),
     ("user1","city1",100),("user1","city2",100),("user1","city2",100), 
     ("user1","city3",100),("user1","city2",100),("user2","city1",100),
     ("user2","city2",100))
val rdd = sc.parallelize(data)

val res = rdd.map(x=> ((x._1,x._2),(1,x._3)))
             .reduceByKey((x,y)=> (x._1+y._1,x._2+y._2))
             .map(x => (x._1._1,(x._1._2,x._2._1,x._2._2)))
             .groupByKey
val userCityUsageRdd = res.map(x => { 
 val m = x._2.toList
 (x._1 ,m.map(y => (y._1,y._2)),m.map(x => x._3).reduce(_+_))
})

output

res20: Array[(String, List[(String, Int)], Int)] = 
 Array((user1,List((city1,3), (city3,1), (city2,3)),700), 
       (user2,List((city1,1), (city2,1)),200))