sarthak sarthak - 1 year ago 61
Scala Question

Optimizing the Spark Code

I am trying to improve my Spark code:

var lst = disOneRDDM.filter(x=> x._2._1 == 1).keys.collect
val disTwoRDDM => {
var b:Boolean = false
for (str <- x._2._2)
if (lst.contains(str))
{b = true
if (b)

I have RDD's of the form (String,(Int,List[String])). Every element in the
has its own entry in this RDD where it serves as a key. A sample input is shown below (this is the
in my code):


My intent is to find in each of the
an element which has the
value of 1 and and change its own
. For example in the input code the entry
has the list which contains
as an element which has
value 1 and also entry
. So I would expect an output of the form:


The size of the RDD is huge and the way I am doing it works but is very slow. In the code above, I am trying to filter the RDD which has
value 1 and form the list
by collecting them. Then for finding elements which have
value 2, I am iterating over the element's list entries and check if the list
contains the entry. If it does, I break out of loop and assign the appropriate

Is there a faster way to do it, for example without having to collect the huge RDD in list?

Answer Source

As @a-spoty-spot commented, if there aren't too many unique values of lst - your best approach is to change it to Set (which removes duplicates) and use broadcast.

Otherwise (if that list of unique keys can still be huge) - here's a solution that doesn't use collect at all, which means it can handle any size. However - since it increases the size of the RDD by using flatMap and performs a join (which entails a shuffle), I'm not sure it would be much faster, that depends on the specifics of your data and your cluster.

// create the lookup "map" (the int values are actually irrelevant, we just need the keys)
val lookup: RDD[(String, Int)] = disOneRDDM.cache().filter(_._2._1 == 1).map(t => (t._1, 1))

val result = disOneRDDM
  .flatMap { // break up each record into individual records for join
    case (k, (i, list)) => => (s, (k, i)))
  .leftOuterJoin(lookup).map { // left join with lookup and change int values if we found a match
    case (item, ((k, i), Some(_))) => (k, (Math.min(2, i), item))
    case (item, ((k, i), _)) => (k, (i, item))
  .groupByKey().map { // group by key to merge back to lists, while mapping to the desired structure
    case (k, iter) =>
      val l = iter.toList
      (k, (,

// (Beethan,(0,List(zse, efg, vcx)))
// (jhg,(10,List(cdz, swq, ghb, awq)))
// (hij,(2,List(klm, zse, vcx)))
// (zse,(1,List(Beethan, nbh, efg)))
// (efg,(1,List(Beethan, jhg, abc, ert)))
// (vcx,(1,List(Beethan, czx, abc)))
// (abc,(2,List(klm, hij, efg)))
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download