amaik amaik - 1 month ago 22
Scala Question

Spark finding gaps in timestamps

I have a Pair RDD, that consists of (Key, (Timestamp,Value)) entries.

When reading the data, the entries are sorted by the timestamp, so each partition of the RDD should be ordered by the timestamp. What I want to do is, find for every key, the biggest gap between 2 sequential timestamps.

I'm thinking about this problem for a long time now, and I don't see how that could be realized given the functions sparks provide. The problems I see are: I loose the order information when I do a simple map, so that is not a possibility. It also seems to me that a groupByKey fails because there are too many entries for a specific key, Trying to do that gives me a

java.io.IOException: No space left on device


Any help about how to approach this would be immensely helpful.

Answer

As suggested by The Archetypal Paul you can use DataFrame and window functions. First required imports:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.lag

Next data has to be converted to a DataFrame:

val df = rdd.mapValues(_._1).toDF("key", "timestamp")

To be able to use lag function we'll need a window definition:

val keyTimestampWindow = Window.partitionBy("key").orderBy("timestamp")

which can be used to select:

val withGap = df.withColumn(
  "gap", $"timestamp" - lag("timestamp", 1).over(keyTimestampWindow)
)

Finally groupBy with max:

withGap.groupBy("key").max("gap")

Following the second advice by The Archetypal Paul you can sort by key and timestamp.

val sorted = rdd.mapValues(_._1).sortBy(identity)

With data arranged like this you can find maximum gap for each key by sliding and reducing by key:

import org.apache.spark.mllib.rdd.RDDFunctions._

sorted.sliding(2).collect {
  case Array((key1, val1), (key2, val2)) if key1 == key2 => (key1, val2 - val1)
}.reduceByKey(Math.max(_, _))

Another variant of the same idea to repartition and sort first:

val partitionedAndSorted = rdd
  .mapValues(_._1)
  .repartitionAndSortWithinPartitions(
    new org.apache.spark.HashPartitioner(rdd.partitions.size)
  )

Data like this can be transformed

val lagged = partitionedAndSorted.mapPartitions(_.sliding(2).collect {
  case Seq((key1, val1), (key2, val2)) if key1 == key2 => (key1, val2 - val1)
}, preservesPartitioning=true)

and reduceByKey:

lagged.reduceByKey(Math.max(_, _))