José María Luna José María Luna - 1 month ago 17
Scala Question

How to fill a variable inside a map - Scala Spark

I have to read a text file and read it to save its values in a variable type

Map[Int, collection.mutable.Map[Int, Double]]
.

I have done it with a foreach and a broadcast variable, and it works properly in my local machine but it does not in a yarn-cluster. Foreach task takes too much time with the same task that in my local computer takes only 1 minute.

val data = sc.textFile(fileOriginal)

val dataRDD = parsedData.map(s => s.split(';').map(_.toDouble)).cache()

val datos = collection.mutable.Map[Int, collection.mutable.Map[Int, Double]]()
val bcDatos = sc.broadcast(datos)


dataRDD.foreach { case x =>

if (bcDatos.value.contains(x.apply(0).toInt)) {
bcDatos.value(x.apply(0).toInt).put(x.apply(1).toInt, x.apply(2) / x.apply(3) * 100)

} else {
bcDatos.value.put(x.apply(0).toInt, collection.mutable.Map((x.apply(1).toInt, x.apply(2) / x.apply(3) * 100)))
}
}


My question is: How can I do the same, but using map? Can I "fill" a variable with that structure inside a map?

Thank you

Answer

When using Spark - you should never try using mutable structures in distributed manner - that's simply not supported. If you mutate a variable created in driver code (whether using broadcast or not), a copy of that variable will be mutated on each executor separately, and you'll never be able to "merge" these mutated partial results and send them back to the driver.

Instead - you should transform your RDD into a new (immutable!) RDD with the data you need.

If I managed to follow your logic correctly - this would give you the map you need:

// assuming dataRDD has type RDD[Array[Double]] and each Array has at least 4 items:
val result: Map[Int, Map[Int, Double]] = dataRDD
  .keyBy(_(0).toInt)
  .mapValues(arr => Map(arr(1).toInt -> arr(2) / arr(3) * 100))
  .reduceByKey((a, b) => a) // you probably want to "merge" maps "a" and "b" here, but your code doesn't seem to do that now either
  .collectAsMap()
Comments