homar homar - 2 months ago 16
Scala Question

How to do custom operations on GroupedData in Spark?

I want to rewrite some of my code written with RDDs to use DataFrames. It was working quite smoothly until I found this:

events
.keyBy(row => (row.getServiceId + row.getClientCreateTimestamp + row.getClientId, row) )
.reduceByKey((e1, e2) => if(e1.getClientSendTimestamp <= e2.getClientSendTimestamp) e1 else e2)
.values


it is simple to start with

events
.groupBy(events("service_id"), events("client_create_timestamp"), events("client_id"))


but what's next? What if I'd like to iterate over every element in the current group? Is it even possible?
Thanks in advance.

Answer

GroupedData cannot be used directly. Data is not physically grouped and it is just a logical operation. You have to apply some variant of agg method for example:

events
 .groupBy($"service_id", $"client_create_timestamp", $"client_id")
 .min("client_send_timestamp")

or

events
 .groupBy($"service_id", $"client_create_timestamp", $"client_id")
 .agg(min($"client_send_timestamp"))

where client_send_timestamp is a column you want to aggregate.

If you want to keep information than aggregate just join or use Window functions - see Find maximum row per group in Spark DataFrame

Spark also supports User Defined Aggregate Functions - see How can I define and use a User-Defined Aggregate Function in Spark SQL?

Spark 2.0+

You could use Dataset.groupByKey which exposes groups as an iterator.