CodeMySky CodeMySky - 2 months ago 48
Python Question

Convert GroupBy Object to Ordered List in Pyspark

I'm using Spark 2.0.0 and dataframe.
Here is my input dataframe as

| id | year | qty |
|----|-------------|--------|
| a | 2012 | 10 |
| b | 2012 | 12 |
| c | 2013 | 5 |
| b | 2014 | 7 |
| c | 2012 | 3 |


What I want is

| id | year_2012 | year_2013 | year_2014 |
|----|-----------|-----------|-----------|
| a | 10 | 0 | 0 |
| b | 12 | 0 | 7 |
| c | 3 | 5 | 0 |


or

| id | yearly_qty |
|----|---------------|
| a | [10, 0, 0] |
| b | [12, 0, 7] |
| c | [3, 5, 0] |


The closest solution I found is
collect_list()
but this function doesn't provide order for the list. In my mind the solution should be like:

data.groupBy('id').agg(collect_function)


Is there a way to generate this without filtering every id out using a loop?

Answer

The first one can be easily achieved using pivot:

from itertools import chain

years = sorted(chain(*df.select("year").distinct().collect()))
df.groupBy("id").pivot("year", years).sum("qty")

which can be further converted to array form:

from pyspark.sql.functions import array, col

(...
    .na.fill(0)
    .select("id",  array(*[col(str(x)) for x in years]).alias("yearly_qty")))

Obtaining the second one directly is probably not worth all the fuss since you'd have to fill the blanks first. Nevertheless you could try:

from pyspark.sql.functions import collect_list, struct, sort_array, broadcast
years_df = sc.parallelize([(x, ) for x in years], 1).toDF(["year"])

(broadcast(years_df)
    .join(df.select("id").distinct())
    .join(df, ["year", "id"], "leftouter")
    .na.fill(0)
    .groupBy("id")
    .agg(sort_array(collect_list(struct("year", "qty"))).qty.alias("qty")))

It also requires Spark 2.0+ to get a support for struct collecting.

Both methods are quite expensive so you should be careful when using these. As a rule of thumb long is better than wide.

Comments