Atif Shahzad Atif Shahzad - 3 months ago 12
Scala Question

Dataframe transformation

I am new to Spark and Scala. I have a dataframe that has huge amounts of data. The schema is like this one.

Lets call this

dataframe empDF
:

id name emp_id code date
1 Andrew D01 C101 2012-06-14
2 James D02 C101 2013-02-26
3 James D02 C102 2013-12-29
4 James D02 C101 2010-09-27


I read this data from the database as a
DataFrame[Row]
object. Now I have to perform the following steps:


  1. Filter the dataframe and get only rows with code
    C101

  2. Now on filtered rows the level must be set. If there is no previous record, the level is set to
    1
    . If there is a previous record that is two or more years older than that record, level is set to
    2
    .



After above two steps the resulting dataframe should look like this:

id name emp_id code date level
1 Andrew D01 C101 2012-06-14 1
2 James D02 C101 2013-02-26 2
4 James D02 C101 2010-09-27 1


Notice that the first row has level
1
because this employee has no history. The second row has level
2
because there is an older record of this employee and the date difference between the two rows is more than two years. The last row has level
1
because there is no record with a previous date.

How can I do that in Scala using the dataframe api, and functions like
map
,
flatmap
,
reduce
, etc.?

Answer

You can use window functions:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val window = Window.partitionBy("name").orderBy("date")
val lagCol = lag(col("date"), 1).over(window)
val diff = datediff(col("date"), col("previousDate"))
val level = when(
  col("previousDate").isNull || (diff <= 730), 1
).otherwise(2)

val newDF = empDF
  .where(col("code") === "C101")
  .withColumn("previousDate", lagCol)
  .withColumn("level", level)
  .drop("previousDate")

newDF.orderBy("id").show

+---+------+------+----+----------+-----+
| id|  name|emp_id|code|      date|level|
+---+------+------+----+----------+-----+
|  1|Andrew|   D01|C101|2012-06-14|    1|
|  2|James |   D02|C101|2013-02-26|    2|
|  4|James |   D02|C101|2010-09-27|    1|
+---+------+------+----+----------+-----+