Prakash Khandelwal Prakash Khandelwal - 3 months ago 17
Scala Question

Apache Spark RDD : How to get latest data based on Paired RDD key and value

I am reading data from HDFS. I have multiple rows for each user, i have to select the latest row of every user.

Row Example (

RDD [Id: Int, DateTime: String, Name: STRING]
)

1,2016-05-01 01:01:01,testa
2,2016-05-02 01:01:01,testb
1,2016-05-05 01:01:01,testa


In above example there are two rows where Id=1, but i want every id only once(latest one only and it's corresponding data) i want output RDD like below.

2,2016-05-02 01:01:01,testb
1,2016-05-05 01:01:01,testa


My Idea

I can collect this data to an array and run for loop to get desired result, by keeping data which is latest for every user.

I read collect gives data to Master node. My data is 30 GB and RAM on Master is 25 GB. So i don't want to try this.

Can you guys share your ideas and code for accomplish this task?

Answer

Convert your Date-String to a timestamp and aggregate on id by selecting the tuple with most recent timestamp.

import java.time.format.DateTimeFormatter
import java.time.LocalDateTime

val yourRdd: RDD[Int, String, String] = sc.parallelize(List(
  1, "2016-05-01 01:01:01", "testa"
  2, "2016-05-02 01:01:01", "testb"
  1, "2016-05-05 01:01:01", "testa"
))

val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH-mm-ss");

val zeroVal = ("", Long.MinValue, "", "")

val rddWithTimestamp = yourRdd
  .map({
    case (id, datetimeStr, name) => {
      val timestamp: Long = LocalDateTime.parse(datetimeStr, dateFormetter)
        .toInstant().toEpochMilli()

      (id, (id, timestamp, datetimeStr, name))
    }
  })

val yourRequiredRdd = rddWithTimestamp
  .aggregateByKey(zeroValue)(
    (t1, t2) => if (t1._2 > t2._2) t1 else t2
    (t1, t2) => if (t1._2 > t2._2) t1 else t2
  )