Quantad - 1 year ago 114
Scala Question

# Spark: Get elements of an RDD based on the elements of an array in another RDD

In Spark Scala framework, I have an RDD,

`rdd1`
, in which each element represents a single element of a matrix
`A`
:

`val rdd1 = dist.map{case (((x,y),z,v)) => ((x,y),v)}`

`x`
represents the row,
`y`
represents the column and
`v`
represents the value in matrix
`A`
.

I also have another RDD,
`rdd2`
, in the form of
`RDD[index, Array[(x, y)]]`
where the array in each element represents the set of elements of the matrix
`A`
, which are stored in
`rdd1`
, needed for the specific
`index`
represented in that element.

Now what I need to do, is get the values of the matrix
`A`
elements for each
`index`
, preserving all data including
`index`
,
`(x,y)`
and
`v`
. What would be a good approach in doing this ?

If I understand correctly, your question boils down to:

``````val valuesRdd = sc.parallelize(Seq(
//((x, y), v)
((0, 0), 5.5),
((1, 0), 7.7)
))

val indicesRdd = sc.parallelize(Seq(
//(index, Array[(x, y)])
(123, Array((0, 0), (1, 0)))
))
``````

And you want to merge these RDDs to get all values `(index, (x, y), v)`, in this case, `(123, (0,0), 5.5)` and `(123, (1,0), 7.7)` ?

You can definitely do this using `join`, since both RDDs have a common column `(x, y)`, but since one of them actually has an `Array[(x, y)]` you'd have to explode that into a set of rows first:

``````val explodedIndices = indicesRdd.flatMap{case (index, coords: Array[(Int, Int)]) => coords.map{case (x, y) => (index, (x, y))}}
// Each row exploded into multiple rows (index, (x, y))

val keyedIndices = explodedIndices.keyBy{case (index, (x, y)) => (x, y)}
// Each row keyed by the coordinates (x, y)

val keyedValues = valuesRdd.keyBy{case ((x, y), v) => (x, y)}
// Each row keyed by the coordinates (x, y)

// Because we have common keys, we can join!
val joined = keyedIndices.join(keyedValues)
``````
