Quantad Quantad - 2 months ago 19
Scala Question

Spark: Get elements of an RDD based on the elements of an array in another RDD

In Spark Scala framework, I have an RDD,

rdd1
, in which each element represents a single element of a matrix
A
:

val rdd1 = dist.map{case (((x,y),z,v)) => ((x,y),v)}


x
represents the row,
y
represents the column and
v
represents the value in matrix
A
.

I also have another RDD,
rdd2
, in the form of
RDD[index, Array[(x, y)]]
where the array in each element represents the set of elements of the matrix
A
, which are stored in
rdd1
, needed for the specific
index
represented in that element.

Now what I need to do, is get the values of the matrix
A
elements for each
index
, preserving all data including
index
,
(x,y)
and
v
. What would be a good approach in doing this ?

Answer

If I understand correctly, your question boils down to:

val valuesRdd = sc.parallelize(Seq(
//((x, y), v)
  ((0, 0), 5.5),            
  ((1, 0), 7.7)
))

val indicesRdd = sc.parallelize(Seq(
//(index, Array[(x, y)])
  (123, Array((0, 0), (1, 0))) 
))

And you want to merge these RDDs to get all values (index, (x, y), v), in this case, (123, (0,0), 5.5) and (123, (1,0), 7.7) ?

You can definitely do this using join, since both RDDs have a common column (x, y), but since one of them actually has an Array[(x, y)] you'd have to explode that into a set of rows first:

val explodedIndices = indicesRdd.flatMap{case (index, coords: Array[(Int, Int)]) => coords.map{case (x, y) => (index, (x, y))}}
// Each row exploded into multiple rows (index, (x, y))

val keyedIndices = explodedIndices.keyBy{case (index, (x, y)) => (x, y)}
// Each row keyed by the coordinates (x, y)

val keyedValues = valuesRdd.keyBy{case ((x, y), v) => (x, y)}
// Each row keyed by the coordinates (x, y)

// Because we have common keys, we can join!
val joined = keyedIndices.join(keyedValues)