Jack Lei Jack Lei - 1 month ago 9
Scala Question

Use more than one collect_list in one query in Spark SQL

I have the following dataframe

data
:

root
|-- userId: string
|-- product: string
|-- rating: double


and the following query:

val result = sqlContext.sql("select userId, collect_list(product), collect_list(rating) from data group by userId")


My question is that, does
product
and
rating
in the aggregated arrays match each other? That is, whether the
product
and the
rating
from the same row have the same index in the aggregated arrays.

Update:
Starting from Spark 2.0.0, one can do
collect_list
on struct type so we can do one
collect_list
on a combined column. But for pre 2.0.0 version, one can only use
collect_list
on primitive type.

Answer

I believe there is no explicit guarantee that all arrays will have the same order. Spark SQL uses multiple optimizations and under certain conditions there is no guarantee that all aggregations are scheduled at the same time (one example is aggregation with DISTINCT). Since exchange (shuffle) results in nondeterministic order it is theoretically possible that order will differ.

So while it should work in practice it could be risky and introduce some hard to detect bugs.

If you Spark 2.0.0 or later you can aggregate non-atomic columns with collect_list:

SELECT userId, collect_list(struct(product, rating)) FROM data GROUP BY userId

If you use an earlier version you can try to use explicit partitions and order:

WITH tmp AS (
  SELECT * FROM data DISTRIBUTE BY userId SORT BY userId, product, rating
)
SELECT userId, collect_list(product), collect_list(rating)
FROM tmp
GROUP BY userId