Mike Mike - 1 year ago 83
Scala Question

Scala Spark map type matching issue

I'm trying to perform a series of transformations on log data with Scala, and I'm having difficulties with matching tuples. I have a data frame with user ids, urls and dates. I can map the data frame to an RDD and reduce by key with this map:

val countsRDD = usersUrlsDays.map { case Row(date:java.sql.Date, user_id:Long, url:String) => Tuple2(Tuple2(user_id, url), 1) }.rdd.reduceByKey(_+_)

This gives me an RDD of ((user_id, url), count):

scala> countsRDD.take(1)
res9: Array[((Long, String), Int)]
scala> countsRDD.take(1)(0)
res10: ((Long, String), Int)

Now I want to invert that by url to yield:

(url, [(user_id, count), ...])

I have tried this:

val urlIndex = countsRDD.map{ case Row(((user_id:Long, url:String), count:Int)) => Tuple2(url, List(Tuple2(user_id, count))) }.reduceByKey(_++_)

This produces match errors, however:

scala.MatchError: ... (of class scala.Tuple2)

I've tried many, many different permutations of these two map calls with explicitly and implicit types and this seems to have gotten me the farthest. I'm hoping that someone here can help point me in the right direction.

Answer Source

Something like this should work:

  .map{ case ((user_id, url), count) => (url, (user_id, count)) }
  • countsRDD is RDD[((String, String), Int)] not RDD[Row].
  • There is no need to use TupleN. Tuple literals will work just fine.
  • Since countsRDD is statically typed (unlike RDD[Row]) you don't have to specify types.
  • Don't use reduceByKey for list concatenation. It is the worst possible approach you can take and ignores computational complexity, garbage colector and the common sense. If you really need grouped data use operation which is designed for it.
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download