hbabbar hbabbar - 3 months ago 39
Scala Question

How to filter data using window functions in spark

I have the following data :

rowid uid time code
1 1 5 a
2 1 6 b
3 1 7 c
4 2 8 a
5 2 9 c
6 2 9 c
7 2 10 c
8 2 11 a
9 2 12 c


Now I wanted to filter the data in such a way that I can remove the rows 6 and 7 as for a particular uid i want to keep just one row with value 'c' in code

So the expected data should be :

rowid uid time code
1 1 5 a
2 1 6 b
3 1 7 c
4 2 8 a
5 2 9 c
8 2 11 a
9 2 12 c


I'm using window function something like this :

val window = Window.partitionBy("uid").orderBy("time")
val change = ((lag("code", 1).over(window) <=> "c")).cast("int")


This would help us identify each row with a code 'c'. Can i extend this to filter out rows to get the expected data

Answer

If you want only one line for each distinct value of code, you could do it with a simple aggregation:

val result = df
  .orderBy("time")
  .groupBy("uid", "code")
  .agg(
    first(col("rowid")).as("rowid"), 
    first(col("time")).as("time")
  )

Otherwise, if you want to remove only the lines where code = "c" (except the first one for each uid) you could try the following:

val window = Window.partitionBy("uid", "code").orderBy("time")
val result = df
  .withColumn("rank", row_number().over(window))
  .where(
    (col("code") !== "c") ||
    col("rank") === 1
  )
  .drop("rank")
Comments