zapstar zapstar - 3 months ago 56
Scala Question

How can I create a Spark DataFrame from a nested array of struct element?

I have read a JSON file into Spark. This file has the following structure:

scala> tweetBlob.printSchema
root
|-- related: struct (nullable = true)
| |-- next: struct (nullable = true)
| | |-- href: string (nullable = true)
|-- search: struct (nullable = true)
| |-- current: long (nullable = true)
| |-- results: long (nullable = true)
|-- tweets: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- cde: struct (nullable = true)
...
...
| | |-- cdeInternal: struct (nullable = true)
...
...
| | |-- message: struct (nullable = true)
...
...


What I would ideally want is a DataFrame with columns "cde", "cdeInternal", "message"... as shown below

root
|-- cde: struct (nullable = true)
...
...
|-- cdeInternal: struct (nullable = true)
...
...
|-- message: struct (nullable = true)
...
...


I have managed to use "explode" to extract elements from the "tweets" array into a column called "tweets"

scala> val tweets = tweetBlob.select(explode($"tweets").as("tweets"))
tweets: org.apache.spark.sql.DataFrame = [tweets: struct<cde:struct<author:struct<gender:string,location:struct<city:string,country:string,state:string>,maritalStatus:struct<evidence:string,isMarried:string>,parenthood:struct<evidence:string,isParent:string>>,content:struct<sentiment:struct<evidence:array<struct<polarity:string,sentimentTerm:string>>,polarity:string>>>,cdeInternal:struct<compliance:struct<isActive:boolean,userProtected:boolean>,tracks:array<struct<id:string>>>,message:struct<actor:struct<displayName:string,favoritesCount:bigint,followersCount:bigint,friendsCount:bigint,id:string,image:string,languages:array<string>,link:string,links:array<struct<href:string,rel:string>>,listedCount:bigint,location:struct<displayName:string,objectType:string>,objectType:string,postedTime...
scala> tweets.printSchema
root
|-- tweets: struct (nullable = true)
| |-- cde: struct (nullable = true)
...
...
| |-- cdeInternal: struct (nullable = true)
...
...
| |-- message: struct (nullable = true)
...
...


How can I select all columns inside the struct and create a DataFrame out of it? Explode does not work on a struct if my understanding is correct.

Any help is appreciated.

Answer

One possible way to handle this is to extract required information from the schema. Lets start with some dummy data:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types._


case class Bar(x: Int, y: String)
case class Foo(bar: Bar)

val df = sc.parallelize(Seq(Foo(Bar(1, "first")), Foo(Bar(2, "second")))).toDF

df.printSchema

// root
//  |-- bar: struct (nullable = true)
//  |    |-- x: integer (nullable = false)
//  |    |-- y: string (nullable = true)

and a helper function:

def children(colname: String, df: DataFrame) = {
  val parent = df.schema.fields.filter(_.name == colname).head
  val fields = parent.dataType match {
    case x: StructType => x.fields
    case _ => Array.empty[StructField]
  }
  fields.map(x => col(s"$colname.${x.name}"))
}

Finally the results:

df.select(children("bar", df): _*).printSchema

// root
// |-- x: integer (nullable = true)
// |-- y: string (nullable = true)
Comments