Robin Kirchner Robin Kirchner - 1 month ago 8
Scala Question

Extract elements of lists in an RDD

What I want to achieve



I'm working with Spark and Scala. I have two Pair RDDs.

rdd1 : RDD[(String, List[String])]
rdd2 : RDD[(String, List[String])]


Both RDDs are joined on their first value.

val joinedRdd = rdd1.join(rdd2)


So the resulting RDD is of type
RDD[(String, (List[String], List[String]))]
. I want to map this RDD and extract the elements of both lists, so that the resulting RDD contains just these elements of the two lists.




Example



rdd1 (id, List(a, b))
rdd2 (id, List(d, e, f))
wantedResult (a, b, d, e, f)





Naive approach



My naive approach would be to adress each element directly with
(i)
, like below:

val rdd = rdd1.join(rdd2)
.map({ case (id, lists) =>
(lists._1(0), lists._1(1), lists._2(0), lists._2(2), lists._2(3)) })

/* results in RDD[(String, String, String, String, String)] */


Is there a way to get the elements of each list, without adressing each individually? Something like "
lists._1.extractAll
". Is there a way to use
flatMap
to achieve what I'm trying to achieve?

Answer

You can simply concatenate the two lists with the ++ operator:

val res: RDD[List[String]] = rdd1.join(rdd2)
  .map { case (_, (list1, list2)) => list1 ++ list2 }

Probably a better approach that would avoid to carry List[String] around that may be very big would be to explode the RDD into smaller (key value) pairs, concatenate them and then do a groupByKey:

val flatten1: RDD[(String, String)] = rdd1.flatMapValues(identity)
val flatten2: RDD[(String, String)] = rdd2.flatMapValues(identity)
val res: RDD[Iterable[String]] = (rdd1 ++ rdd2).groupByKey.values