sarthak sarthak - 1 month ago 5
Scala Question

Optimizing the Spark Code

I am trying to improve my Spark code:

var lst = disOneRDDM.filter(x=> x._2._1 == 1).keys.collect
val disTwoRDDM = disOneRDDM.map(x=> {
var b:Boolean = false
breakable{
for (str <- x._2._2)
if (lst.contains(str))
{b = true
break}
}
if (b)
(x._1,(Math.min(2,x._2._1),x._2._2))
else
x
}).cache


I have RDD's of the form (String,(Int,List[String])). Every element in the
List[String]
has its own entry in this RDD where it serves as a key. A sample input is shown below (this is the
disOneRDDM
in my code):

("abc",(10,List("hij","efg","klm")))
("efg",(1,List("jhg","Beethan","abc","ert")))
("Beethan",(0,List("efg","vcx","zse")))
("vcx",(1,List("czx","Beethan","abc")))
("zse",(1,List("efg","Beethan","nbh")))
("hij",(10,List("vcx","klm","zse")))
("jhg",(10,List("ghb","cdz","awq","swq")))
...


My intent is to find in each of the
List[String]
an element which has the
Int
value of 1 and and change its own
Int
to
min(2,current_Int_value)
. For example in the input code the entry
"abc"
has the list which contains
"efg"
as an element which has
Int
value 1 and also entry
"hij"
has
"vcx"
. So I would expect an output of the form:

("abc",(2,List("hij","efg","klm")))
("efg",(1,List("jhg","Beethan","abc","ert")))
("Beethan",(0,List("efg","vcx","zse")))
("vcx",(1,List("czx","Beethan","abc")))
("zse",(1,List("efg","Beethan","nbh")))
("hij",(2,List("vcx","klm","zse")))
("jhg",(10,List("ghb","cdz","awq","swq")))
...


The size of the RDD is huge and the way I am doing it works but is very slow. In the code above, I am trying to filter the RDD which has
Int
value 1 and form the list
lst
by collecting them. Then for finding elements which have
Int
value 2, I am iterating over the element's list entries and check if the list
lst
contains the entry. If it does, I break out of loop and assign the appropriate
Int
value.

Is there a faster way to do it, for example without having to collect the huge RDD in list?

Answer

As @a-spoty-spot commented, if there aren't too many unique values of lst - your best approach is to change it to Set (which removes duplicates) and use broadcast.

Otherwise (if that list of unique keys can still be huge) - here's a solution that doesn't use collect at all, which means it can handle any size. However - since it increases the size of the RDD by using flatMap and performs a join (which entails a shuffle), I'm not sure it would be much faster, that depends on the specifics of your data and your cluster.

// create the lookup "map" (the int values are actually irrelevant, we just need the keys)
val lookup: RDD[(String, Int)] = disOneRDDM.cache().filter(_._2._1 == 1).map(t => (t._1, 1))

val result = disOneRDDM
  .flatMap { // break up each record into individual records for join
    case (k, (i, list)) => list.map(s => (s, (k, i)))
  }
  .leftOuterJoin(lookup).map { // left join with lookup and change int values if we found a match
    case (item, ((k, i), Some(_))) => (k, (Math.min(2, i), item))
    case (item, ((k, i), _)) => (k, (i, item))
  }
  .groupByKey().map { // group by key to merge back to lists, while mapping to the desired structure
    case (k, iter) =>
      val l = iter.toList
      (k, (l.map(_._1).min, l.map(_._2)))
  }

result.foreach(println)
// (Beethan,(0,List(zse, efg, vcx)))
// (jhg,(10,List(cdz, swq, ghb, awq)))
// (hij,(2,List(klm, zse, vcx)))
// (zse,(1,List(Beethan, nbh, efg)))
// (efg,(1,List(Beethan, jhg, abc, ert)))
// (vcx,(1,List(Beethan, czx, abc)))
// (abc,(2,List(klm, hij, efg)))
Comments