Laurens van Dam Laurens van Dam - 2 months ago 10
Scala Question

Spark reduce and aggregate on same data-set

I have a text file which I read and then split using the

split
operation. This results in an RDD with
Array(A, B, C, D, E, F, G, H, I)
.

I would like to find
max(F) - min(G)
for every key
E
(reduce by key
E
). Then I want to combine the resulting values by key
C
and concatenate this sum result for every row with the same key.

For example:

+--+--+--+--+
| C| E| F| G|
+--+--+--+--+
|en| 1| 3| 1|
|en| 1| 4| 0|
|nl| 2| 1| 1|
|nl| 2| 5| 2|
|nl| 3| 9| 3|
|nl| 3| 6| 4|
|en| 4| 9| 1|
|en| 4| 2| 1|
+-----------+


Should result in

+--+--+-------------+---+
| C| E|max(F)-min(G)|sum|
+--+--+-------------+---+
|en| 1| 4 |12 |
|nl| 2| 4 |10 |
|nl| 3| 6 |10 |
|en| 4| 8 |12 |
+--+--+-------------+---+


What would be the best way to tackle this? Currently I am trying to perform the
max(F)-min(G)
by running

val maxCounts = logEntries.map(line => (line(4), line(5).toLong)).reduceByKey((x, y) => math.max(x, y))
val minCounts = logEntries.map(line => (line(4), line(6).toLong)).reduceByKey((x, y) => math.min(x, y))

val maxMinCounts = maxCounts.join(minCounts).map{ case(id, maxmin) => (id, (maxmin._1 - maxmin._2)) }


And then
join
the resulting RDDs. However, this becomes tricky when I also want to sum these values and append them to my existing data set.

I would love to hear any suggestions!

Answer

This kind of logic is easily implemented in the dataframe API (also). But you need to explicitly form your columns from the array:

val window = Window.partitionBy('v)

val df = rdd
  .map { case Array(_, _, c, _, e, f, g, _, _) => (c,e,f,g) }
  .toDF("C","E","F","G")
  .groupBy('C,'E)
  .agg((max('F) - min('G)).as("diff"))
  .withColumn("sum",sum(diff).over(window))