Robin Kirchner Robin Kirchner - 1 year ago 92
Scala Question

Extract elements of lists in an RDD

What I want to achieve

I'm working with Spark and Scala. I have two Pair RDDs.

rdd1 : RDD[(String, List[String])]
rdd2 : RDD[(String, List[String])]

Both RDDs are joined on their first value.

val joinedRdd = rdd1.join(rdd2)

So the resulting RDD is of type
RDD[(String, (List[String], List[String]))]
. I want to map this RDD and extract the elements of both lists, so that the resulting RDD contains just these elements of the two lists.


rdd1 (id, List(a, b))
rdd2 (id, List(d, e, f))
wantedResult (a, b, d, e, f)

Naive approach

My naive approach would be to adress each element directly with
, like below:

val rdd = rdd1.join(rdd2)
.map({ case (id, lists) =>
(lists._1(0), lists._1(1), lists._2(0), lists._2(2), lists._2(3)) })

/* results in RDD[(String, String, String, String, String)] */

Is there a way to get the elements of each list, without adressing each individually? Something like "
". Is there a way to use
to achieve what I'm trying to achieve?

Answer Source

You can simply concatenate the two lists with the ++ operator:

val res: RDD[List[String]] = rdd1.join(rdd2)
  .map { case (_, (list1, list2)) => list1 ++ list2 }

Probably a better approach that would avoid to carry List[String] around that may be very big would be to explode the RDD into smaller (key value) pairs, concatenate them and then do a groupByKey:

val flatten1: RDD[(String, String)] = rdd1.flatMapValues(identity)
val flatten2: RDD[(String, String)] = rdd2.flatMapValues(identity)
val res: RDD[Iterable[String]] = (rdd1 ++ rdd2).groupByKey.values
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download