Yash Yash -4 years ago 62
Scala Question

Spark Dataframe/ Dataset: Generic Conditional cumulative sum

I have a dataframe which has a few attributes (C1 to C2), an offset (in days) and a few values (V1, V2).

val inputDF= spark.sparkContext.parallelize(Seq((1,2,30, 100, -1),(1,2,30, 100, 0), (1,2,30, 100, 1),(11,21,30, 100, -1),(11,21,30, 100, 0), (11,21,30, 100, 1)), 10).toDF("c1", "c2", "v1", "v2", "offset")
inputDF: org.apache.spark.sql.DataFrame = [c1: int, c2: int ... 3 more fields]

scala> inputDF.show
+---+---+---+---+------+
| c1| c2| v1| v2|offset|
+---+---+---+---+------+
| 1| 2| 30|100| -1|
| 1| 2| 30|100| 0|
| 1| 2| 30|100| 1|
| 11| 21| 30|100| -1|
| 11| 21| 30|100| 0|
| 11| 21| 30|100| 1|
+---+---+---+---+------+


What I need to do is, calculate the cumulative sum for V1, V2 for (c1,c2) across offset.

I tried this but that's far away from a generic solution that could work on any data frame.

import org.apache.spark.sql.expressions.Window

val groupKey = List("c1", "c2").map(x => col(x.trim))
val orderByKey = List("offset").map(x => col(x.trim))

val w = Window.partitionBy(groupKey: _*).orderBy(orderByKey: _*)

val outputDF = inputDF
.withColumn("cumulative_v1", sum(inputDF("v1")).over(w))
.withColumn("cumulative_v2", sum(inputDF("v2")).over(w))

+---+---+---+---+------+----------------------------
| c1| c2| v1| v2|offset|cumulative_v1| cumulative_v2|
+---+---+---+---+------+-------------|--------------|
| 1| 2| 30|100| -1|30 | 100 |
| 1| 2| 30|100| 0|60 | 200 |
| 1| 2| 30|100| 1|90 | 300 |
| 11| 21| 30|100| -1|30 | 100 |
| 11| 21| 30|100| 0|60 | 200 |
| 11| 21| 30|100| 1|90 | 300 |
+---+---+---+---+------+-----------------------------


The challenge is [a] I need to do this across multiple and varying offset windows (-1 to 1), (-10 to 10), (-30 to 30) or any others [b] I need to use this function across multiple dataframes/ datasets, so I'm hoping for a generic function that could either work in RDD/ Dataset.

Any thoughts on how I could achieve this in Spark 2.0?

Help is much appreciated. Thanks!

Answer Source

Another generic way to solve this would be with a foldLeft as explained here - https://stackoverflow.com/a/44532867/7059145

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download