Tim - 1 year ago 142
Scala Question

# Sequences in Spark dataframe

I have dataframe in Spark. Looks like this:

``````+-------+----------+-------+
|  value|     group|     ts|
+-------+----------+-------+
|      A|         X|      1|
|      B|         X|      2|
|      B|         X|      3|
|      D|         X|      4|
|      E|         X|      5|
|      A|         Y|      1|
|      C|         Y|      2|
+-------+----------+-------+
``````

Endgoal: I'd like to find how many sequences
`A-B-E`
(a sequence is just a list of subsequent rows) there are. With the added constraint that subsequent parts of the sequence can be maximum
`n`
rows apart. Let's consider for this example that
`n`
is 2.

Consider group
`X`
.
In this case there is exactly 1
`D`
between
`B`
and
`E`
(multiple consecutive
`B`
s are ignored). Which means
`B`
and
`E`
are 1 row apart and thus there is a sequence
`A-B-E`

`collect_list()`
, creating a string (like DNA) and using substring search with regex. But I was wondering if there's a more elegant distributed way, perhaps using window functions?

Edit:

Note that the provided dataframe is just an example. The real dataframe (and thus groups) can be arbitrary long.

Edited to answer @Tim's comment + fix patterns of the type "AABE"

Yep, using a window function helps, but I created an `id` to have an ordering:

``````val df = List(
(1,"A","X",1),
(2,"B","X",2),
(3,"B","X",3),
(4,"D","X",4),
(5,"E","X",5),
(6,"A","Y",1),
(7,"C","Y",2)
).toDF("id","value","group","ts")

import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy('group).orderBy('id)
``````

Then lag will collect what is needed, but a function is required to generate the `Column` expression (note the split to eliminate double counting of "AABE". WARNING (TODO): this creates a bug on patterns of the type "ABAE" which are then ignored):

``````def createSeq(m:Int) = split(
concat(
(1 to 2*m)
.map(i => coalesce(lag('value,-i).over(w),lit("")))
:_*),"A")(0)

val m=2
val tmp = df
.withColumn("seq",createSeq(m))

+---+-----+-----+---+----+
| id|value|group| ts| seq|
+---+-----+-----+---+----+
|  6|    A|    Y|  1|   C|
|  7|    C|    Y|  2|    |
|  1|    A|    X|  1|BBDE|
|  2|    B|    X|  2| BDE|
|  3|    B|    X|  3|  DE|
|  4|    D|    X|  4|   E|
|  5|    E|    X|  5|    |
+---+-----+-----+---+----+
``````

Because of the poor set of collection functions available in the `Column` API, avoiding regex altogether is much easier using a UDF

``````def patternInSeq(m: Int) = udf((str: String) => {
var notFound = str
.split("B")
.filter(_.contains("E"))
.filter(_.indexOf("E") <= m)
.isEmpty
!notFound
})

val res = tmp
.filter(('value === "A") && (locate("B",'seq) > 0))
.filter(locate("B",'seq) <= m && (locate("E",'seq) > 1))
.filter(patternInSeq(m)('seq))
.groupBy('group)
.count
res.show

+-----+-----+
|group|count|
+-----+-----+
|    X|    1|
+-----+-----+
``````
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download