Knight71 - 2 months ago 10

Scala Question

I have an RDD of

`(String,String,Int)`

- I want to reduce it based on the first two strings
- And Then based on the first String I want to group the (String,Int) and sort them
- After sorting I need to group them into small groups each containing n elements.

I have done the code below. The problem is the number of elements in the step 2 is very large for a single key

and the

`reduceByKey(x++y)`

`//Input`

val data = Array(

("c1","a1",1), ("c1","b1",1), ("c2","a1",1),("c1","a2",1), ("c1","b2",1),

("c2","a2",1), ("c1","a1",1), ("c1","b1",1), ("c2","a1",1))

val rdd = sc.parallelize(data)

val r1 = rdd.map(x => ((x._1, x._2), (x._3)))

val r2 = r1.reduceByKey((x, y) => x + y ).map(x => ((x._1._1), (x._1._2, x._2)))

// This is taking long time.

val r3 = r2.mapValues(x => ArrayBuffer(x)).reduceByKey((x, y) => x ++ y)

// from the list I will be doing grouping.

val r4 = r3.map(x => (x._1 , x._2.toList.sorted.grouped(2).toList))

Problem is the "c1" has lot of unique entries like b1 ,b2....million and

`reduceByKey`

Is there a way to achieve this more efficiently?

`// output`

Array((c1,List(List((a1,2), (a2,1)), List((b1,2), (b2,1)))), (c2,List(List((a1,2), (a2,1)))))

Answer Source

There at least few problems with a way you group your data. The first problem is introduced by

```
mapValues(x => ArrayBuffer(x))
```

It creates a large amount of mutable objects which provide no additional value since you cannot leverage their mutability in the subsequent `reduceByKey`

```
reduceByKey((x, y) => x ++ y)
```

where each `++`

creates a new collection and neither argument can be safely mutated. Since `reduceByKey`

applies map side aggregation situation is even worse and pretty much creates GC hell.

Is there a way to achieve this more efficiently?

Unless you have some deeper knowledge about data distribution which can be used to define smarter partitioner the simplest improvement is to replace `mapValues`

+ `reduceByKey`

with simple `groupByKey`

:

```
val r3 = r2.groupByKey
```

It should be also possible to use a custom partitioner for both `reduceByKey`

calls and `mapPartitions`

with `preservesPartitioning`

instead of `map`

.

```
class FirsElementPartitioner(partitions: Int)
extends org.apache.spark.Partitioner {
def numPartitions = partitions
def getPartition(key: Any): Int = {
key.asInstanceOf[(Any, Any)]._1.## % numPartitions
}
}
val r2 = r1
.reduceByKey(new FirsElementPartitioner(8), (x, y) => x + y)
.mapPartitions(iter => iter.map(x => ((x._1._1), (x._1._2, x._2))), true)
// No shuffle required here.
val r3 = r2.groupByKey
```

It requires only a single shuffle and `groupByKey`

is simply a local operations:

```
r3.toDebugString
// (8) MapPartitionsRDD[41] at groupByKey at <console>:37 []
// | MapPartitionsRDD[40] at mapPartitions at <console>:35 []
// | ShuffledRDD[39] at reduceByKey at <console>:34 []
// +-(8) MapPartitionsRDD[1] at map at <console>:28 []
// | ParallelCollectionRDD[0] at parallelize at <console>:26 []
```