Lobsterrrr - 1 year ago 47

Scala Question

I have the following entries in RDD:

`(111,List(List(1473163148,abc)))`

(111,List(List(1473163143,def)))

(111,List(List(1473163143,abd)))

(111,List(List(1473163139,asd)))

(111,List(List(1473163696,rtf)))

(111,List(List(1473163700,rgd)))

(111,List(List(1473163703,dmf)))

I want to group these entries into new entries, so that each new entry would contain a list of old entries within 30 minutes. It seems to be straightforward but in the practice my code does not do the trick.

`val grouped = processed.reduceByKey((x,y) => x ++ y)`

val separated = grouped.flatMap { case (k, l) => MyFuncObj.createGroups(l).map(sublist => (k, sublist)) }

object MyFuncObj {

def createGroups(l: List[List[Any]]): Iterable[List[List[Any]]] = {

l.groupBy(_.productElement(0).toString.toLong / 30*60).values

}

}

After applying this code to the above data, I get the following result (I only provide the timestamps because this is the key point):

`1473163143 1473163143 1473163148`

1473163139

1473163696 1473163700 1473163703

1473168932

Since these timestamps are seconds, they should be grouped as follows:

`1473163143 1473163143 1473163148 1473163139 1473163696 1473163700 1473163703`

1473168932

How to solve this task?

To be more clear: I expect to get 30-minute buckets starting at the time of the first record.

Answer Source

There are two issues here:

If you want the "buckets" to start at the time of the first entry - you should use the

**delta**between each timestamp and that first timestamp before you make the divisionMissing parenthesis around

`30*60`

- you're dividing by 30 and*then*multiplying that result by 60, instead of dividing by`(30*60)`

:`scala> 5000 / 30*60 res0: Int = 9960 scala> 5000 / (30*60) res1: Int = 2`

Altogether - this seems to do what you need:

```
// sample data:
val processed = sc.parallelize(List(
(111,List(List(1473163148L, "abc"))),
(111,List(List(1473163143L,"def"))),
(111,List(List(1473163143L,"abd"))),
(111,List(List(1473163139L,"asd"))),
(111,List(List(1473163696L,"rtf"))),
(111,List(List(1473163700L,"rgd"))),
(111,List(List(1473168932L,"dmf"))))
)
// first - find the lowest timsestamp:
// if input isn't ordered:
val firstTimestamp: Long = processed.values.map { case List((l: Long) :: _) => l }.min()
// if input is sorted by timestamp:
val firstTimestamp: Long = processed.first()._2.head.head.toString.toLong
def createGroups(l: List[List[Any]]): Iterable[List[List[Any]]] = {
// divide the DELTA between each timestamp and first one by 30 minutes to find bucket:
l.groupBy(t => (firstTimestamp - t.productElement(0).toString.toLong) / (30*60)).values
}
// continue as you did:
val grouped: RDD[(Int, List[List[Any]])] = processed.reduceByKey((x, y) => x ++ y)
val separated: RDD[(Int, List[List[Any]])] = grouped.flatMap {
case (k, l) => createGroups(l).map(sublist => (k, sublist))
}
separated.foreach(println)
// prints:
// (111,List(List(1473168932, dmf)))
// (111,List(List(1473163148, abc), List(1473163143, def), List(1473163143, abd), List(1473163139, asd), List(1473163696, rtf), List(1473163700, rgd)))
```