Atif Shahzad Atif Shahzad - 2 months ago 7
Scala Question

Dataframe transformation

I am new to Spark and Scala. I have a dataframe that has huge amounts of data. The schema is like this one.

Lets call this

dataframe empDF

id name emp_id code date
1 Andrew D01 C101 2012-06-14
2 James D02 C101 2013-02-26
3 James D02 C102 2013-12-29
4 James D02 C101 2010-09-27

I read this data from the database as a
object. Now I have to perform the following steps:

  1. Filter the dataframe and get only rows with code

  2. Now on filtered rows the level must be set. If there is no previous record, the level is set to
    . If there is a previous record that is two or more years older than that record, level is set to

After above two steps the resulting dataframe should look like this:

id name emp_id code date level
1 Andrew D01 C101 2012-06-14 1
2 James D02 C101 2013-02-26 2
4 James D02 C101 2010-09-27 1

Notice that the first row has level
because this employee has no history. The second row has level
because there is an older record of this employee and the date difference between the two rows is more than two years. The last row has level
because there is no record with a previous date.

How can I do that in Scala using the dataframe api, and functions like
, etc.?


You can use window functions:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val window = Window.partitionBy("name").orderBy("date")
val lagCol = lag(col("date"), 1).over(window)
val diff = datediff(col("date"), col("previousDate"))
val level = when(
  col("previousDate").isNull || (diff <= 730), 1

val newDF = empDF
  .where(col("code") === "C101")
  .withColumn("previousDate", lagCol)
  .withColumn("level", level)


| id|  name|emp_id|code|      date|level|
|  1|Andrew|   D01|C101|2012-06-14|    1|
|  2|James |   D02|C101|2013-02-26|    2|
|  4|James |   D02|C101|2010-09-27|    1|