Matt Pollock Matt Pollock - 2 months ago 23
R Question

How to unnest data with SparkR?

Using

SparkR
how can nested arrays be "exploded along"? I've tried using
explode
like so:

dat <- nested_spark_df %>%
mutate(a=explode(metadata)) %>%
head()


but though the above does not cause an exception to be thrown, it does not promote the nested fields in
metadata
to the top level. Essentially I'm seeking behavior similar to that of Hive's
LATERAL VIEW explode()
functionality without relying on a
HiveContext
.

Note that in the code snippet I'm using the NSE enabled via
SparkRext
. I think the equivalent straight-
SparkR
would be something like
... %>% mutate(a=explode(nested_spark_df$metadata)) ...
or something along those lines.

EDIT



I've tried using
LATERAL VIEW explode(...)
in the
SparkR::sql
function. It seems to work great with Parquet and ORC data. However when working with nested Avro data I tried:

dat <- collect(sql(HiveContext,
paste0("SELECT a.id, ax.arrival_airport, x.arrival_runway ",
"FROM avrodb.flight a ",
"LATERAL VIEW explode(a.metadata) a AS ax ",
"WHERE ax.arrival_airport='ATL'")))


Only to get the following error, though when a swap out
avrodb
with
parquetdb
containing equivalent data it does what I expect.

Error in invokeJava(isStatic = TRUE, className, methodName, ...) :
org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 1345, dev-dn04.myorg.org): org.apache.avro.AvroTypeException: Found metadata, expecting union
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
at org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:219)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at org.apache.avr
Calls: <Anonymous> ... collect -> collect -> .local -> callJStatic -> invokeJava


This despite the fact that I included the DataBricks Avro package when starting Spark. Reading the same data with spark using a
SQLContext
(instead of the
HiveContext
) works fine except that I haven't been able to figure out how to effectively use the
explode()
function. I've also confirmed that this is not an issue with the data itself by successfully querying the same files via Hive using the same HQL statement I tried running with
SparkR::sql(HiveContext, hql)

Answer

Thanks much to @Sim. I finally figured out a sane approach though. The key is that after the explode operation when all the exploded values are still nested one level deep a select must be performed. For example:

dat <- nested_spark_df %>% 
 mutate(a=explode(nested_spark_df$metadata)) %>%
 select("id", "a.fld1", "a.fld2")

which will result in a SparkR DataFrame object with 3 columns: id, fld1, and fld2 (no a. prepended).

My mental block was that I was trying to get explode to act like PIG's flatten where it would create a bunch of new field names at the top level of the schema.

Comments