sarthak - 1 month ago 5
Scala Question

# Optimizing the Spark Code

I am trying to improve my Spark code:

``````var lst = disOneRDDM.filter(x=> x._2._1 == 1).keys.collect
val disTwoRDDM = disOneRDDM.map(x=> {
var b:Boolean = false
breakable{
for (str <- x._2._2)
if (lst.contains(str))
{b = true
break}
}
if (b)
(x._1,(Math.min(2,x._2._1),x._2._2))
else
x
}).cache
``````

I have RDD's of the form (String,(Int,List[String])). Every element in the
`List[String]`
has its own entry in this RDD where it serves as a key. A sample input is shown below (this is the
`disOneRDDM`
in my code):

``````("abc",(10,List("hij","efg","klm")))
("efg",(1,List("jhg","Beethan","abc","ert")))
("Beethan",(0,List("efg","vcx","zse")))
("vcx",(1,List("czx","Beethan","abc")))
("zse",(1,List("efg","Beethan","nbh")))
("hij",(10,List("vcx","klm","zse")))
("jhg",(10,List("ghb","cdz","awq","swq")))
...
``````

My intent is to find in each of the
`List[String]`
an element which has the
`Int`
value of 1 and and change its own
`Int`
to
`min(2,current_Int_value)`
. For example in the input code the entry
`"abc"`
has the list which contains
`"efg"`
as an element which has
`Int`
value 1 and also entry
`"hij"`
has
`"vcx"`
. So I would expect an output of the form:

``````("abc",(2,List("hij","efg","klm")))
("efg",(1,List("jhg","Beethan","abc","ert")))
("Beethan",(0,List("efg","vcx","zse")))
("vcx",(1,List("czx","Beethan","abc")))
("zse",(1,List("efg","Beethan","nbh")))
("hij",(2,List("vcx","klm","zse")))
("jhg",(10,List("ghb","cdz","awq","swq")))
...
``````

The size of the RDD is huge and the way I am doing it works but is very slow. In the code above, I am trying to filter the RDD which has
`Int`
value 1 and form the list
`lst`
by collecting them. Then for finding elements which have
`Int`
value 2, I am iterating over the element's list entries and check if the list
`lst`
contains the entry. If it does, I break out of loop and assign the appropriate
`Int`
value.

Is there a faster way to do it, for example without having to collect the huge RDD in list?

As @a-spoty-spot commented, if there aren't too many unique values of `lst` - your best approach is to change it to `Set` (which removes duplicates) and use broadcast.

Otherwise (if that list of unique keys can still be huge) - here's a solution that doesn't use `collect` at all, which means it can handle any size. However - since it increases the size of the RDD by using `flatMap` and performs a `join` (which entails a shuffle), I'm not sure it would be much faster, that depends on the specifics of your data and your cluster.

``````// create the lookup "map" (the int values are actually irrelevant, we just need the keys)
val lookup: RDD[(String, Int)] = disOneRDDM.cache().filter(_._2._1 == 1).map(t => (t._1, 1))

val result = disOneRDDM
.flatMap { // break up each record into individual records for join
case (k, (i, list)) => list.map(s => (s, (k, i)))
}
.leftOuterJoin(lookup).map { // left join with lookup and change int values if we found a match
case (item, ((k, i), Some(_))) => (k, (Math.min(2, i), item))
case (item, ((k, i), _)) => (k, (i, item))
}
.groupByKey().map { // group by key to merge back to lists, while mapping to the desired structure
case (k, iter) =>
val l = iter.toList
(k, (l.map(_._1).min, l.map(_._2)))
}

result.foreach(println)
// (Beethan,(0,List(zse, efg, vcx)))
// (jhg,(10,List(cdz, swq, ghb, awq)))
// (hij,(2,List(klm, zse, vcx)))
// (zse,(1,List(Beethan, nbh, efg)))
// (efg,(1,List(Beethan, jhg, abc, ert)))
// (vcx,(1,List(Beethan, czx, abc)))
// (abc,(2,List(klm, hij, efg)))
``````
Source (Stackoverflow)