Knight71 Knight71 - 2 months ago 10
Scala Question

RDD split and do aggregation on new RDDs

I have an RDD of

(String,String,Int)
.


  1. I want to reduce it based on the first two strings

  2. And Then based on the first String I want to group the (String,Int) and sort them

  3. After sorting I need to group them into small groups each containing n elements.



I have done the code below. The problem is the number of elements in the step 2 is very large for a single key
and the
reduceByKey(x++y)
takes a lot of time.

//Input
val data = Array(
("c1","a1",1), ("c1","b1",1), ("c2","a1",1),("c1","a2",1), ("c1","b2",1),
("c2","a2",1), ("c1","a1",1), ("c1","b1",1), ("c2","a1",1))

val rdd = sc.parallelize(data)
val r1 = rdd.map(x => ((x._1, x._2), (x._3)))
val r2 = r1.reduceByKey((x, y) => x + y ).map(x => ((x._1._1), (x._1._2, x._2)))

// This is taking long time.
val r3 = r2.mapValues(x => ArrayBuffer(x)).reduceByKey((x, y) => x ++ y)

// from the list I will be doing grouping.
val r4 = r3.map(x => (x._1 , x._2.toList.sorted.grouped(2).toList))


Problem is the "c1" has lot of unique entries like b1 ,b2....million and
reduceByKey
is killing time because all the values are going to single node.
Is there a way to achieve this more efficiently?

// output
Array((c1,List(List((a1,2), (a2,1)), List((b1,2), (b2,1)))), (c2,List(List((a1,2), (a2,1)))))

Answer Source

There at least few problems with a way you group your data. The first problem is introduced by

 mapValues(x => ArrayBuffer(x))

It creates a large amount of mutable objects which provide no additional value since you cannot leverage their mutability in the subsequent reduceByKey

reduceByKey((x, y) => x ++ y) 

where each ++ creates a new collection and neither argument can be safely mutated. Since reduceByKey applies map side aggregation situation is even worse and pretty much creates GC hell.

Is there a way to achieve this more efficiently?

Unless you have some deeper knowledge about data distribution which can be used to define smarter partitioner the simplest improvement is to replace mapValues + reduceByKey with simple groupByKey:

val r3 = r2.groupByKey

It should be also possible to use a custom partitioner for both reduceByKey calls and mapPartitions with preservesPartitioning instead of map.

class FirsElementPartitioner(partitions: Int)
    extends org.apache.spark.Partitioner {
  def numPartitions  = partitions
  def getPartition(key: Any): Int = {
    key.asInstanceOf[(Any, Any)]._1.## % numPartitions
  }
}

val r2 = r1
  .reduceByKey(new FirsElementPartitioner(8), (x, y) => x + y)
  .mapPartitions(iter => iter.map(x => ((x._1._1), (x._1._2, x._2))), true)

// No shuffle required here.
val r3 = r2.groupByKey

It requires only a single shuffle and groupByKey is simply a local operations:

r3.toDebugString
// (8) MapPartitionsRDD[41] at groupByKey at <console>:37 []
//  |  MapPartitionsRDD[40] at mapPartitions at <console>:35 []
//  |  ShuffledRDD[39] at reduceByKey at <console>:34 []
//  +-(8) MapPartitionsRDD[1] at map at <console>:28 []
//     |  ParallelCollectionRDD[0] at parallelize at <console>:26 []