Ninja Ninja - 1 year ago 55
Scala Question

How to replace in values in spark dataframes after recalculations?

I have a schema in spark as

root
|-- atom: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- dailydata: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- datatimezone: string (nullable = true)
| | | | |-- intervaltime: long (nullable = true)
| | | | |-- intervalvalue: long (nullable = true)
| | | | |-- utcacquisitiontime: string (nullable = true)
| | |-- usage: string (nullable = true)
| -- titlename: string (nullable = true)


I have extracted the
utcacquisitiontime
and
datatimezone
as below from above schema

val result=q.selectExpr("explode(dailydata) as r").select("r.utcacquisitiontime","r.datatimezone")

+--------------------+------------+
| utcacquisitiontime|datatimezone|
+--------------------+------------+
|2017-03-27T22:00:00Z| +02:00|
|2017-03-27T22:15:00Z| +02:00|
|2017-03-27T22:30:00Z| +02:00|
|2017-03-27T22:45:00Z| +02:00|
|2017-03-27T23:00:00Z| +02:00|
|2017-03-27T23:15:00Z| +02:00|
|2017-03-27T23:30:00Z| +02:00|
|2017-03-27T23:45:00Z| +02:00|
|2017-03-28T00:00:00Z| +02:00|
|2017-03-28T00:15:00Z| +02:00|
|2017-03-28T00:30:00Z| +02:00|
|2017-03-28T00:45:00Z| +02:00|
|2017-03-28T01:00:00Z| +02:00|
|2017-03-28T01:15:00Z| +02:00|
|2017-03-28T01:30:00Z| +02:00|
|2017-03-28T01:45:00Z| +02:00|
|2017-03-28T02:00:00Z| +02:00|
|2017-03-28T02:15:00Z| +02:00|
|2017-03-28T02:30:00Z| +02:00|
|2017-03-28T02:45:00Z| +02:00|
+--------------------+------------+


I need to calculate
localtime
using these two columns and replace them by the
localtime
after calculations. How shall I calculate
localtime
and replace the same?

Answer Source

You can rely on udf function in spark (User Defined Function). Also in the org.apache.sql.functions._ there are plenty of already predefined function that might help you. But here is how you can make this work

+-------------------+------------+
| utcacquisitiontime|datatimezone|
+-------------------+------------+
|2017-03-27T22:00:00|      +02:00|
+-------------------+------------+

Note that I have removed the unnecessary "Z" from time column. Using JodaTime dependency define a udf function like this:

val toTimestamp = udf((time:String, zone:String) => {
      val timezone = DateTimeZone.forID(zone)
     val df = DateTimeFormat.forPattern("yyyy-mm-dd'T'HH:mm:ss")
     new java.sql.Timestamp(df.withZone(timezone).parseDateTime(time).getMillis)
     }) 

Apply it on a column with withColumn

df.withColumn("timestamp", toTimestamp(col("utcacquisitiontime"), col("datatimezone"))

Show the results (note that in the schema the column timestamp is of type Timestamp so you can do date operation on it)

+-------------------+------------+--------------------+
| utcacquisitiontime|datatimezone|           timestamp|
+-------------------+------------+--------------------+
|2017-03-27T22:00:00|      +02:00|2017-01-27 22:00:...|
+-------------------+------------+--------------------+

root
 |-- utcacquisitiontime: string (nullable = true)
 |-- datatimezone: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download