Newbie Newbie - 1 month ago 9
Scala Question

SPARK: How to get day difference between a data frame column and timestamp in SCALA

How do I get date difference (no of days in between) in data frame scala ?

I have a df :

[id: string, itemName: string, eventTimeStamp: timestamp]
and a startTime (timestamp string) how do I get a column "Daydifference" - day between (startTime - timeStamp)

My Code :

Initial df :

+------------+-----------+-------------------------+
| id | itemName | eventTimeStamp |
----------------------------------------------------
| 1 | TV | 2016-09-19T00:00:00Z |
| 1 | Movie | 2016-09-19T00:00:00Z |
| 1 | TV | 2016-09-26T00:00:00Z |
| 2 | TV | 2016-09-18T00:00:00Z |


I need to get most recent eventTimeStamp based on id and itemName, so I did:

val result = df.groupBy("id", "itemName").agg(max("eventTimeStamp") as "mostRecent")


+------------+-----------+-------------------------+
| id | itemName | mostRecent |
----------------------------------------------------
| 1 | TV | 2016-09-26T00:00:00Z |
| 1 | Movie | 2016-09-19T00:00:00Z |
| 2 | TV | 2016-09-26T00:00:00Z |


Now I need to get the date difference between mostRecent and startTime (2016-09-29T00:00:00Z) , so that I can get :

{ id : 1, {"itemMap" : {"TV" : 3, "Movie" : 10 }} }
{ id : 2, {"itemMap" : {"TV" : 3}} }


I tried like this :

val startTime = "2016-09-26T00:00:00Z"

val result = df.groupBy("id", "itemName").agg(datediff(startTime, max("eventTimeStamp")) as Daydifference)


case class Data (itemMap : Map[String, Long]) extends Serializable


result.map{
case r =>
val id = r.getAs[String]("id")
val itemName = r.getAs[String]("itemName")
val Daydifference = r.getAs[Long]("Daydifference")

(id, Map(itemName -> Daydifference ))

}.reduceByKey((x, y) => x ++ y).map{
case (k, v) =>
(k, JacksonUtil.toJson(Data(v)))
}


But getting error on datediff. Can some one tell me how do I acheive this ?

Answer

When you want to use some constant ("literal") value as a Column in a DataFrame, you should use the lit(...) function. The other error here is trying to use a String as the startDate, to compare it to a timestamp column you can use java.sql.Date:

val startTime = new java.sql.Date(2016, 8, 26) // beware, months are Zero-based

val result = df.groupBy("id", "itemName")
  .agg(datediff(lit(startTime), max("eventTimeStamp")) as "Daydifference")

result.show()
//  +---+--------+-------------+
//  | id|itemName|Daydifference|
//  +---+--------+-------------+
//  |  1|   Movie|            7|
//  |  1|      TV|            0|
//  |  2|      TV|            0|
//  +---+--------+-------------+