Matt Anthony Matt Anthony - 1 year ago 358
R Question

SparkR gapply - function returns a multi-row R dataframe

Let's say I want to execute something as follows:

df =<some_address>)
function(key, x) {
return(data.frame(x, newcol1=f1(x), newcol2=f2(x))

where the return of the function has multiple rows. To be clear, the examples in the documentation (which sadly echoes much of the Spark documentation where the examples are trivially simple) don't help me identify whether this will be handled as I expect.

I would expect that the outcome of this would be, for k groups created in the DataFrame with n_k output rows per group, that the result of the gapply() call would have sum(1..k, n_k) rows, where the key value is replicated for each of n_k rows for each group in key k ... However, the schema-field suggests to me that this is not how this will be handled - in fact it suggests that it will either want the result pushed into a single row.

Hopefully this is clear, albeit theoretical (I'm sorry I can't share my actual code example). Can someone verify or explain how such a function will actually be treated?

Answer Source

Exact expectations regarding input and output are clearly stated in the official documentation:

Apply a function to each group of a SparkDataFrame. The function is to be applied to each group of the SparkDataFrame and should have only two parameters: grouping key and R data.frame corresponding to that key. The groups are chosen from SparkDataFrames column(s). The output of function should be a data.frame.

Schema specifies the row format of the resulting SparkDataFrame. It must represent R function’s output schema on the basis of Spark data types. The column names of the returned data.frame are set by user. Below is the data type mapping between R and Spark.

In other words your function should take a key and data.frame of rows corresponding to that key and return data.frame that can be represented using Spark SQL types with schema provided as schema argument. There are no restriction regarding number of rows. You could for example apply identity transformation as follows:

df <- as.DataFrame(iris)

gapply(df, "Species", function(k, x) x, schema(df))

the same way as aggregations:

gapply(df, "Species",
  function(k, x) {
    dplyr::summarize(dplyr::group_by(x, Species), max(Sepal_Width))
    structField("species", "string"),
    structField("max_s_width", "double"))

although in practice you should prefer aggregations directly on DataFrame (groupBy %>% agg).

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download