Laurens Laurens - 2 months ago 8
Scala Question

Spark sum by key without reducing rows

I have an RDD with the following structure:

(lang, id, name, max, min)


I want to add another column,
total
, which holds the subtraction of the maximum value of column
max
and the minimum of column
min
for every unique
lang
(without reducing the amount of rows). So I would get something like

rdd:
+----+--+----+---+---+
|lang|id|name|max|min|
+----+--+----+---+---+
| en| | | 5| 1|
| en| | | 2| 0|
| de| | | 9| 2|
| en| | | 7| 1|
| nl| | | 3| 0|
| nl| | | 5| 1|
+----+--+----+---+---+


To

rdd:
+----+--+----+---+---+-----+
|lang|id|name|max|min|total|
+----+--+----+---+---+-----+
| en| | | 5| 1| 7|
| en| | | 2| 0| 7|
| de| | | 9| 2| 7|
| en| | | 7| 1| 7|
| nl| | | 3| 0| 5|
| nl| | | 5| 1| 5|
+----+--+----+---+---+-----+


For compatibility reasons, I want to achieve this without using DataFrames/Spark SQL.

Any suggestion is much appreciated!

Answer

You can aggregate:

val rdd = sc.parallelize(Seq(
  ("en", "id1", "name1", 5,  1), ("en", "id2", "name2", 2,  0), 
  ("de", "id3", "name3", 9,  2), ("en", "id4", "name4", 7,  1),
  ("nl", "id5", "name5", 3,  0), ("nl", "id6", "name6", 5,  1)
))

val totals = rdd.keyBy(_._1).aggregateByKey((Long.MinValue, Long.MaxValue))(
  { case ((maxA, minA), (_, _, _, maxX, minX)) => 
    (Math.max(maxA, maxX), Math.min(minA, minX)) }, 
  { case ((maxA1, minA1), (maxA2, minA2)) => 
    (Math.max(maxA1, maxA2), Math.min(minA1, minA2))}
).mapValues { case (max, min) => max - min }

join with the original data:

val vals = rdd.keyBy(_._1).join(totals).values

and flatten (with Shapeless):

import shapeless.syntax.std.tuple._

val result = vals.map { case (x, y) => x :+ y }

result.toDF.show

with an output:

+---+---+-----+---+---+---+ 
| _1| _2|   _3| _4| _5| _6|
+---+---+-----+---+---+---+
| en|id1|name1|  5|  1|  7|
| en|id2|name2|  2|  0|  7|
| en|id4|name4|  7|  1|  7|
| de|id3|name3|  9|  2|  7|
| nl|id5|name5|  3|  0|  5|
| nl|id6|name6|  5|  1|  5|
+---+---+-----+---+---+---+

but for complex aggregations this becomes tedious, inefficient, and hard to manage pretty fast.