Menkes Menkes - 1 month ago 15
Python Question

Aggregating on 5 minute windows in pyspark

I Have the following dataframe

df
:

User | Datetime | amount | length
A | 2016-01-01 12:01 | 10 | 20
A | 2016-01-01 12:03 | 6 | 10
A | 2016-01-01 12:05 | 1 | 3
A | 2016-01-01 12:06 | 3 | 5
B | 2016-01-01 12:01 | 10 | 20
B | 2016-01-01 12:02 | 8 | 20


And I want to use pyspark efficiently to aggregate over a 5 minute time window and do some calculations - so for example calculate the average amount & length for every use for every 5 minute time window - the df will look like this:

User | Datetime | amount | length
A | 2016-01-01 12:00 | 8 | 15
B | 2016-01-01 12:00 | 2 | 4
A | 2016-01-01 12:05 | 9 | 20


How can I achieve this in the most efficient way?
In pandas I used:

df.groupby(['cs_username', pd.TimeGrouper('5Min')].apply(...)

Answer

Unfortunately, in pyspark this won't look so cool like in pandas ;-) You can try casting date to timestamp and using modulo, for example:

import pyspark.sql.functions as F
seconds = 300
seconds_window = F.from_unixtime(F.unix_timestamp('date') - F.unix_timestamp('date') % seconds)
dataframe.withColumn('5_minutes_window', seconds_window)

Then you can simply group by new column and perform requested aggregations.