lloydh lloydh - 24 days ago 8
Scala Question

How to get keys and values from MapType column in SparkSQL DataFrame

I have data in a parquet file which has 2 fields:

object_id: String
and
alpha: Map<>
.

It is read into a data frame in sparkSQL and the schema looks like this:

scala> alphaDF.printSchema()
root
|-- object_id: string (nullable = true)
|-- ALPHA: map (nullable = true)
| |-- key: string
| |-- value: struct (valueContainsNull = true)


I am using Spark 2.0 and I am trying to create a new data frame in which columns need to be
object_id
plus keys of the
ALPHA
map as in
object_id, key1, key2, key2, ...


I was first trying to see if I could at least access the map like this:

scala> alphaDF.map(a => a(0)).collect()
<console>:32: error: Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are
supported by importing spark.implicits._ Support for serializing other
types will be added in future releases.
alphaDF.map(a => a(0)).collect()


but unfortunately I can't seem to be able to figure out how to access the keys of the map.

Can someone please show me a way to get the
object_id
plus map keys as column names and map values as respective values in a new dataframe?

Answer

General method can be expressed in a few steps. First required imports:

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.Row

and example data:

val ds = Seq(
  (1, Map("foo" -> (1, "a"), "bar" -> (2, "b"))),
  (2, Map("foo" -> (3, "c"))),
  (3, Map("bar" -> (4, "d")))
).toDF("id", "alpha")

We can use an UDF to extract keys:

val keys = udf[Seq[String], Map[String, Row]](_.keys.toSeq)

val keysDF = df.select(keys($"alpha"))

Find distinct ones:

val distinctKeys = keysDF.as[Seq[String]].flatMap(identity).distinct
  .collect.sorted

And select:

ds.select($"id" +: distinctKeys.map(x => $"alpha".getItem(x).alias(x)): _*)