Paul Reiners Paul Reiners - 22 days ago 4
Scala Question

Spark SQL DataFrame transformation involving partitioning and lagging

I want to transform a Spark SQL DataFrame like this:

animal value
------------
cat 8
cat 5
cat 6
dog 2
dog 4
dog 3
rat 7
rat 4
rat 9


into a DataFrame like this:

animal value previous-value
-----------------------------
cat 8 0
cat 5 8
cat 6 5
dog 2 0
dog 4 2
dog 3 4
rat 7 0
rat 4 7
rat 9 4


I sort of want to partition by
animal
, and then, for each
animal
,
previous-value
lags one row behind
value
(with a default value of
0
), and then put the partitions back together again.

Answer

This can be accomplished using a window function.

import org.apache.spark.sql.expressions.Window
import sqlContext.implicits._

val df = sc.parallelize(Seq(("cat", 8, "01:00"),("cat", 5, "02:00"),("cat", 6, "03:00"),("dog", 2, "02:00"),("dog", 4, "04:00"),("dog", 3, "06:00"),("rat", 7, "01:00"),("rat", 4, "03:00"),("rat", 9, "05:00"))).toDF("animal", "value", "time")

df.show
+------+-----+-----+
|animal|value| time|
+------+-----+-----+
|   cat|    8|01:00|
|   cat|    5|02:00|
|   cat|    6|03:00|
|   dog|    2|02:00|
|   dog|    4|04:00|
|   dog|    3|06:00|
|   rat|    7|01:00|
|   rat|    4|03:00|
|   rat|    9|05:00|
+------+-----+-----+

I've added a "time" field to illustrate orderBy.

val w1 = Window.partitionBy($"animal").orderBy($"time")

val previous_value = lag($"value", 1).over(w1)
val df1 = df.withColumn("previous", previous_value)

df1.show
+------+-----+-----+--------+                                                   
|animal|value| time|previous|
+------+-----+-----+--------+
|   dog|    2|02:00|    null|
|   dog|    4|04:00|       2|
|   dog|    3|06:00|       4|
|   cat|    8|01:00|    null|
|   cat|    5|02:00|       8|
|   cat|    6|03:00|       5|
|   rat|    7|01:00|    null|
|   rat|    4|03:00|       7|
|   rat|    9|05:00|       4|
+------+-----+-----+--------+

If you want to replace nulls with 0:

val df2 = df1.na.fill(0)
df2.show
+------+-----+-----+--------+
|animal|value| time|previous|
+------+-----+-----+--------+
|   dog|    2|02:00|       0|
|   dog|    4|04:00|       2|
|   dog|    3|06:00|       4|
|   cat|    8|01:00|       0|
|   cat|    5|02:00|       8|
|   cat|    6|03:00|       5|
|   rat|    7|01:00|       0|
|   rat|    4|03:00|       7|
|   rat|    9|05:00|       4|
+------+-----+-----+--------+