Darshan Darshan - 2 months ago 31
Scala Question

Convert Array[(String,String)] type to RDD[(String,String)] type in spark

I am new to spark.

Here is my code:

val Data = sc.parallelize(List(
("I", "India"),
("U", "USA"),
("W", "West")))

val DataArray = sc.broadcast(Data.collect)

val FinalData = DataArray.value


Here
FinalData
is of
Array[(String, String)]
type.
But I want data to be in the form of
RDD[(String, String)]
type.

Can I convert
FinalData
to
RDD[(String, String)]
type.

More Detail:

I want to join Two RDD So to optimize join condition(For performance point of view)
I am broadcasting small RDD to all cluster so that data shuffling will be less.(Indirectly performance will get improved)
So for all this I am writting something like this:

//Big Data
val FirstRDD = sc.parallelize(List(****Data of first table****))

//Small Data
val SecondRDD = sc.parallelize(List(****Data of Second table****))


So defintely I will broadcast Small Data set(means SecondRDD)

val DataArray = sc.broadcast(SecondRDD.collect)

val FinalData = DataArray.value


//Here it will give error that

val Join = FirstRDD.leftOuterJoin(FinalData)



Found Array required RDD


That's why I am looking for Array to RDD conversion.

Answer

Broadcasts are indeed useful to improve performance of a JOIN between a large RDD and a smaller one. When you do that, broadcast (along with map or mapPartitions) replaces the join, it's not used in a join, and therefore in no way you'll need to "transform a broadcast into an RDD".

Here's how it would look:

val largeRDD = sc.parallelize(List(
  ("I", "India"),
  ("U", "USA"),
  ("W", "West")))

val smallRDD = sc.parallelize(List(
  ("I", 34),
  ("U", 45)))

val smaller = sc.broadcast(smallRDD.collectAsMap())

// using "smaller.value" inside the function passed to RDD.map ->
// on executor side. Broadcast made sure it's copied to each executor (once!)
val joinResult = largeRDD.map { case (k, v) => (k, v, smaller.value.get(k)) }

joinResult.foreach(println)
// prints:
// (I,India,Some(34))
// (W,West,None)
// (U,USA,Some(45))

See a similar solution (using mapPartitions) which might be more efficient here.

Comments