Lobsterrrr Lobsterrrr - 3 months ago 8
Scala Question

Group RDD entries by timestamps into the lists of entries covering X minutes

I have the following entries in RDD:

(111,List(List(1473163148,abc)))
(111,List(List(1473163143,def)))
(111,List(List(1473163143,abd)))
(111,List(List(1473163139,asd)))
(111,List(List(1473163696,rtf)))
(111,List(List(1473163700,rgd)))
(111,List(List(1473163703,dmf)))


I want to group these entries into new entries, so that each new entry would contain a list of old entries within 30 minutes. It seems to be straightforward but in the practice my code does not do the trick.

val grouped = processed.reduceByKey((x,y) => x ++ y)
val separated = grouped.flatMap { case (k, l) => MyFuncObj.createGroups(l).map(sublist => (k, sublist)) }

object MyFuncObj {

def createGroups(l: List[List[Any]]): Iterable[List[List[Any]]] = {
l.groupBy(_.productElement(0).toString.toLong / 30*60).values
}

}


After applying this code to the above data, I get the following result (I only provide the timestamps because this is the key point):

1473163143 1473163143 1473163148
1473163139
1473163696 1473163700 1473163703
1473168932


Since these timestamps are seconds, they should be grouped as follows:

1473163143 1473163143 1473163148 1473163139 1473163696 1473163700 1473163703
1473168932


How to solve this task?

UPDATE:

To be more clear: I expect to get 30-minute buckets starting at the time of the first record.

Answer

There are two issues here:

  1. If you want the "buckets" to start at the time of the first entry - you should use the delta between each timestamp and that first timestamp before you make the division

  2. Missing parenthesis around 30*60 - you're dividing by 30 and then multiplying that result by 60, instead of dividing by (30*60):

    scala> 5000 / 30*60
    res0: Int = 9960
    
    scala> 5000 / (30*60)
    res1: Int = 2
    

Altogether - this seems to do what you need:

// sample data:
val processed = sc.parallelize(List(
  (111,List(List(1473163148L, "abc"))),
  (111,List(List(1473163143L,"def"))),
  (111,List(List(1473163143L,"abd"))),
  (111,List(List(1473163139L,"asd"))),
  (111,List(List(1473163696L,"rtf"))),
  (111,List(List(1473163700L,"rgd"))),
  (111,List(List(1473168932L,"dmf"))))
)


// first - find the lowest timsestamp:
// if input isn't ordered:
val firstTimestamp: Long = processed.values.map { case List((l: Long) :: _) => l }.min()

// if input is sorted by timestamp:
val firstTimestamp: Long = processed.first()._2.head.head.toString.toLong

def createGroups(l: List[List[Any]]): Iterable[List[List[Any]]] = {
  // divide the DELTA between each timestamp and first one by 30 minutes to find bucket:
  l.groupBy(t => (firstTimestamp - t.productElement(0).toString.toLong) / (30*60)).values
}

// continue as you did:
val grouped: RDD[(Int, List[List[Any]])] = processed.reduceByKey((x, y) => x ++ y)
val separated: RDD[(Int, List[List[Any]])] = grouped.flatMap {
  case (k, l) => createGroups(l).map(sublist => (k, sublist))
}

separated.foreach(println)
// prints:
// (111,List(List(1473168932, dmf)))
// (111,List(List(1473163148, abc), List(1473163143, def), List(1473163143, abd), List(1473163139, asd), List(1473163696, rtf), List(1473163700, rgd)))