Quantad - 8 months ago 54

Scala Question

In Spark Scala framework, I have an RDD,

`rdd1`

`A`

`val rdd1 = dist.map{case (((x,y),z,v)) => ((x,y),v)}`

`x`

`y`

`v`

`A`

I also have another RDD,

`rdd2`

`RDD[index, Array[(x, y)]]`

`A`

`rdd1`

`index`

Now what I need to do, is get the values of the matrix

`A`

`index`

`index`

`(x,y)`

`v`

Answer

If I understand correctly, your question boils down to:

```
val valuesRdd = sc.parallelize(Seq(
//((x, y), v)
((0, 0), 5.5),
((1, 0), 7.7)
))
val indicesRdd = sc.parallelize(Seq(
//(index, Array[(x, y)])
(123, Array((0, 0), (1, 0)))
))
```

And you want to merge these RDDs to get all values `(index, (x, y), v)`

, in this case, `(123, (0,0), 5.5)`

and `(123, (1,0), 7.7)`

?

You can definitely do this using `join`

, since both RDDs have a common column `(x, y)`

, but since one of them actually has an `Array[(x, y)]`

you'd have to explode that into a set of rows first:

```
val explodedIndices = indicesRdd.flatMap{case (index, coords: Array[(Int, Int)]) => coords.map{case (x, y) => (index, (x, y))}}
// Each row exploded into multiple rows (index, (x, y))
val keyedIndices = explodedIndices.keyBy{case (index, (x, y)) => (x, y)}
// Each row keyed by the coordinates (x, y)
val keyedValues = valuesRdd.keyBy{case ((x, y), v) => (x, y)}
// Each row keyed by the coordinates (x, y)
// Because we have common keys, we can join!
val joined = keyedIndices.join(keyedValues)
```