Laurens van Dam Laurens van Dam - 1 year ago 82
Scala Question

Spark reduce and aggregate on same data-set

I have a text file which I read and then split using the

split
operation. This results in an RDD with
Array(A, B, C, D, E, F, G, H, I)
.

I would like to find
max(F) - min(G)
for every key
E
(reduce by key
E
). Then I want to combine the resulting values by key
C
and concatenate this sum result for every row with the same key.

For example:

+--+--+--+--+
| C| E| F| G|
+--+--+--+--+
|en| 1| 3| 1|
|en| 1| 4| 0|
|nl| 2| 1| 1|
|nl| 2| 5| 2|
|nl| 3| 9| 3|
|nl| 3| 6| 4|
|en| 4| 9| 1|
|en| 4| 2| 1|
+-----------+


Should result in

+--+--+-------------+---+
| C| E|max(F)-min(G)|sum|
+--+--+-------------+---+
|en| 1| 4 |12 |
|nl| 2| 4 |10 |
|nl| 3| 6 |10 |
|en| 4| 8 |12 |
+--+--+-------------+---+


What would be the best way to tackle this? Currently I am trying to perform the
max(F)-min(G)
by running

val maxCounts = logEntries.map(line => (line(4), line(5).toLong)).reduceByKey((x, y) => math.max(x, y))
val minCounts = logEntries.map(line => (line(4), line(6).toLong)).reduceByKey((x, y) => math.min(x, y))

val maxMinCounts = maxCounts.join(minCounts).map{ case(id, maxmin) => (id, (maxmin._1 - maxmin._2)) }


And then
join
the resulting RDDs. However, this becomes tricky when I also want to sum these values and append them to my existing data set.

I would love to hear any suggestions!

Answer Source

This kind of logic is easily implemented in the dataframe API (also). But you need to explicitly form your columns from the array:

val window = Window.partitionBy('v)

val df = rdd
  .map { case Array(_, _, c, _, e, f, g, _, _) => (c,e,f,g) }
  .toDF("C","E","F","G")
  .groupBy('C,'E)
  .agg((max('F) - min('G)).as("diff"))
  .withColumn("sum",sum(diff).over(window))