Swetha Swetha - 4 months ago 48
JSON Question

How to query on data frame where 1 field of StringType has json value in Spark SQL

I am trying to use SQL on a spark data frame. But the data frame has 1 value has string (which is JSON like structure) :

I saved my data frame to temp table : TestTable

When I did desc :

col_name data_type
requestId string
name string
features string


But features values is a json :

{"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":{"Movie":2,"Park Visit":11},"benefits":{"freeTime":13}}


I just want to query on TestTable where totalSpent > 10. Can some tell me how do I do this ?

My JSON file looks like :

{
"requestId": 232323,
"name": "ravi",
"features": "{"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":{"Movie":2,"Park Visit":11},"benefits":{"freeTime":13}}"
}


features is a string. I only need totalSpent in that. I tried with :

val features = StructType(
Array(StructField("totalSpent",LongType,true),
StructField("movies",LongType,true)
))

val schema = StructType(Array(
StructField("requestId",StringType,true),
StructField("name",StringType,true),
StructField("features",features,true),
)
)

val records = sqlContext.read.schema(schema).json(filePath)


Since each request has one JSON string of features. But this gives me error.

When I tried with

val records = sqlContext.jsonFile(filePath)

records.printSchema


shows me :

root
|-- requestId: string (nullable = true)
|-- features: string (nullable = true)
|-- name: string (nullable = true)


Can I use parallelize inside StructField while creating schema ? I tried with :

I first tried with :

val customer = StructField("features",StringType,true)
val events = sc.parallelize(customer :: Nil)


val schema = StructType(Array(
StructField("requestId",StringType,true),
StructField("name", StructType(events, true),true),
StructField("features",features,true),
)
)


This gives me error as well. Also tried :

import net.liftweb.json.parse

case class KV(k: String, v: Int)

val parseJson = udf((s: String) => {
implicit val formats = net.liftweb.json.DefaultFormats
parse(s).extract[KV]
})

val parsed = records.withColumn("parsedJSON", parseJson($"features"))
parsed.show

This gives me :
<console>:78: error: object liftweb is not a member of package net
import net.liftweb.json.parse


Tried :

I tried with :

val parseJson = udf((s: String) => {
sqlContext.read.json(s)
})

val parsed = records.withColumn("parsedJSON", parseJson($"features"))
parsed.show


But again error.

Tried :

import org.json4s._
import org.json4s.jackson.JsonMethods._

val parseJson = udf((s: String) => {
parse(s)
})

val parsed = records.withColumn("parsedJSON", parseJson($"features"))
parsed.show


But it gives me :

java.lang.UnsupportedOperationException: Schema for type org.json4s.JValue is not supported
at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:153)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:64)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)

Answer

When you return data from UDF it has to be representable as SQL types and JSON AST is not. One approach is to create a case class similar to this one:

case class Features(
  places: Integer, 
  movies: Integer,
  totalPlacesVisited: Integer, 
  totalSpent: Integer,
  SpentMap: Map[String, Integer],
  benefits: Map[String, Integer]
) 

and use it to extract objects:

val df = Seq((
  232323, "ravi",
  """{"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":{"Movie":2,"Park Visit":11},"benefits":{"freeTime":13}}"""
)).toDF("requestId", "name", "features")

val extractFeatures = udf((features: String) => 
  parse(features).extract[Features])

val parsed = df.withColumn("features", extractFeatures($"features"))
parsed.show(false)

// +---------+----+-----------------------------------------------------------------+
// |requestId|name|features                                                         |
// +---------+----+-----------------------------------------------------------------+
// |232323   |ravi|[11,2,0,13,Map(Movie -> 2, Park Visit -> 11),Map(freeTime -> 13)]|
// +---------+----+-----------------------------------------------------------------+

parsed.printSchema

// root
//  |-- requestId: integer (nullable = false)
//  |-- name: string (nullable = true)
//  |-- features: struct (nullable = true)
//  |    |-- places: integer (nullable = true)
//  |    |-- movies: integer (nullable = true)
//  |    |-- totalPlacesVisited: integer (nullable = true)
//  |    |-- totalSpent: integer (nullable = true)
//  |    |-- SpentMap: map (nullable = true)
//  |    |    |-- key: string
//  |    |    |-- value: integer (valueContainsNull = true)
//  |    |-- benefits: map (nullable = true)
//  |    |    |-- key: string
//  |    |    |-- value: integer (valueContainsNull = true)

Depending on the other records and expected usage you should adjust representation and add relevant error handling logic.

For alternative approaches see How to query JSON data column using Spark DataFrames?

Comments