Atif Shahzad Atif Shahzad - 1 year ago 171
Scala Question

Scala : Map and Flatmap on RDD

I have an RDD with this structure

RDD[((String, String), List[(Int, Timestamp, String)])]

and data

((D2,Saad Arif),List((4,2011-10-05 00:00:00.0,C101), (5,2010-01-27 00:00:00.0,C101)))
((D3,Faran Abid),List((7,2016-10-05 00:00:00.0,C101)))
((D1,Atif Shahzad),List((1,2012-04-15 00:00:00.0,C101), (2,2011-10-05 00:00:00.0,C101), (3,2006-12-25 00:00:00.0,C101)))

consider this as table means

'(D2,Saad Arif)'

is like key and

'List((4,2011-10-05 00:00:00.0,C101), (5,2010-01-27 00:00:00.0,C101)'

is like rows for this key.
Now i want to check for each row that if there is record(history) with code 'C101' before two or more year then set level to 2 otherwise to 1. So the resulting RDD should look like this

((D2,Saad Arif),List((4,2011-10-05 00:00:00.0,C101, 1), (5,2010-01-27 00:00:00.0,C101, 1)))
((D3,Faran Abid),List((7,2016-10-05 00:00:00.0,C101, 1)))
((D1,Atif Shahzad),List((1,2012-04-15 00:00:00.0,C101, 2), (2,2011-10-05 00:00:00.0,C101, 2), (3,2006-12-25 00:00:00.0,C101, 1)))

Notice new level after timestamp.How can i do this with map or flatmap?

Answer Source
import java.time.LocalDate
import java.time.format.DateTimeFormatter
import java.time.Period    

val df1 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.S")

val futureDate = LocalDate.parse("2100-01-01 00:00:00.0", df1)

val yourRequiredRdd = yourRdd
    case (t, list) => {
      val list1 ={
        case (id, dateStr, id2) => (id, LocalDate.parse(dateStr, df1), id2) 

      val oldestDate = list1
        .filter({ case (id, date, id2) => id2.equals("C101") })
        .foldLeft(futureDate)((oldestDate, date) => {
          val period = Period.between(oldestDate, date)
          if (!period.isNegative()) oldestDate else date

      val newList = list1
          case (id, date, "C101") => {
            val periodFromOldestDate = Period.between(oldestDate, date)
            val extraNumber = if (periodFromOldestDate.getYears() >= 2) 2 else 1
            (id, date, "C101", extraNumber)
          case (id, date, id2) => {
            (id, date, id2, 1)

      (t, newList)
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download