Newbiee Newbiee - 5 months ago 99
JSON Question

Spark: How to parse multiple json with List of arrays of Struct?

I am trying to get avg of ratings of all json objects in a file. I loaded the file and converted to data frame but getting error while parsing for avg.
Sample Request :

{
"country": "France",
"customerId": "France001",
"visited": [
{
"placeName": "US",
"rating": "2.3",
"famousRest": "N/A",
"placeId": "AVBS34"

},
{
"placeName": "US",
"rating": "3.3",
"famousRest": "SeriousPie",
"placeId": "VBSs34"

},
{
"placeName": "Canada",
"rating": "4.3",
"famousRest": "TimHortons",
"placeId": "AVBv4d"

}
]
}


so for this json, US avg rating will be (2.3 + 3.3)/2 = 2.8

{
"country": "Egypt",
"customerId": "Egypt009",
"visited": [
{
"placeName": "US",
"rating": "1.3",
"famousRest": "McDonald",
"placeId": "Dedcf3"

},
{
"placeName": "US",
"rating": "3.3",
"famousRest": "EagleNest",
"placeId": "CDfet3"

},


}

{
"country": "Canada",
"customerId": "Canada012",
"visited": [
{
"placeName": "UK",
"rating": "3.3",
"famousRest": "N/A",
"placeId": "XSdce2"

},


]
}


for this avg for us= (3.3 +1.3)/2 = 2.3

so over all, the average rating will be : (2.8 + 2.3)/2 = 2.55 (only two requests have 'US' in their visited list)

My schema :

root
|-- country: string(nullable=true)
|-- customerId:string(nullable=true)
|-- visited: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- placeId: string (nullable = true)
| | |-- placeName: string (nullable = true)
| | |-- famousRest: string (nullable = true)
| | |-- rating: string (nullable = true)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.jsonFile("temp.txt")
df.show()


so basically I need to get average of ratings where placeName = 'US' in say for eg. AVG_RATING = sum of rating in each json object where placeName is US / count of such visited entry and FINAL_VALUE = Sum of all AVG_RATING in each json object with placeName 'US' / count of all json objects with placeName = 'US' .

So far I tried :

df.registerTempTable("people")
sqlContext.sql("select avg(expResults.rank) from people LATERAL VIEW explode(visited)people AS expResults where expResults.placeName = 'US' ").collect().foreach(println)

val result = df.select("*").where(array_contains (df("visited.placeName"), "US")); - gives the list where visited array contains US. But I am not sure how do parse through list of structs.


Can some one tell me how do I do this ?

Answer

It looks you want something like this:

import org.apache.spark.sql.functions.{avg, explode}

val result = df
  .withColumn("visit", explode($"visited"))    // Explode visits
  .groupBy($"customerId", $"visit.placeName")  // Group by using dot syntax
  .agg(avg($"visit.rating".cast("double")).alias("tmp"))
  .groupBy($"placeName").agg(avg($"tmp").alias("value"))

After that you can filter this for a country of your choice.

result.where($"placeName" === "US").show
// +---------+-----+
// |placeName|value|
// +---------+-----+
// |       US| 2.55|
// +---------+-----+