Newbiee Newbiee - 4 months ago 129
JSON Question

How to specify only particular fields using read.schema in JSON : SPARK Scala

I am trying to programmatically enforce schema(json) on textFile which looks like json. I tried with jsonFile but the issue is for creating a dataframe from a list of json files, spark has to do a 1 pass through the data to create a schema for the dataframe. So it needs to parse all the data which is taking longer time (4 hours since my data is zipped and of size TBs). So I want to try reading it as textFile and enforce schema to get interested fields alone to later query on the resulting data frame. But I am not sure how do I map it to the input. Can some give me some reference on how do I map schema to json like input.

input :

This is the full schema :

records: org.apache.spark.sql.DataFrame = [country: string, countryFeatures: string, customerId: string, homeCountry: string, homeCountryFeatures: string, places: array<struct<freeTrial:boolean,placeId:string,placeRating:bigint>>, siteName: string, siteId: string, siteTypeId: string, Timestamp: bigint, Timezone: string, countryId: string, pageId: string, homeId: string, pageType: string, model: string, requestId: string, sessionId: string, inputs: array<struct<inputName:string,inputType:string,inputId:string,offerType:string,originalRating:bigint,processed:boolean,rating:bigint,score:double,methodId:string>>]


But I am only interested in few fields like :

res45: Array[String] = Array({"requestId":"bnjinmm","siteName":"bueller","pageType":"ad","model":"prepare","inputs":[{"methodId":"436136582","inputType":"US","processed":true,"rating":0,"originalRating":1},{"methodId":"23232322","inputType":"UK","processed":falase,"rating":0,"originalRating":1}]


val records = sc.textFile("s3://testData/sample.json.gz")

val schema = StructType(Array(StructField("requestId",StringType,true),
StructField("siteName",StringType,true),
StructField("model",StringType,true),
StructField("pageType",StringType,true),
StructField("inputs", ArrayType(
StructType(
StructField("inputType",StringType,true),
StructField("originalRating",LongType,true),
StructField("processed",BooleanType,true),
StructField("rating",LongType,true),
StructField("methodId",StringType,true)
),true),true)))

val rowRDD = ??

val inputRDD = sqlContext.applySchema(rowRDD, schema)
inputRDD.registerTempTable("input")

sql("select * from input").foreach(println)


Is there any way to map this ? Or do I need to use son parser or something. I want to use textFile only because of the constraints.

Tried with :

val records =sqlContext.read.schema(schema).json("s3://testData/test2.gz")


But keeping getting the error :

<console>:37: error: overloaded method value apply with alternatives:
(fields: Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
(fields: java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
(fields: Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
cannot be applied to (org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField)
StructField("inputs",ArrayType(StructType(StructField("inputType",StringType,true), StructField("originalRating",LongType,true), StructField("processed",BooleanType,true), StructField("rating",LongType,true), StructField("score",DoubleType,true), StructField("methodId",StringType,true)),true),true)))
^

Answer

It can load with following code with predefined schema, spark don't need to go through the file in ZIP file. The code in the question has ambiguity.

import org.apache.spark.sql.types._

val input = StructType(
                Array(
                    StructField("inputType",StringType,true), 
                    StructField("originalRating",LongType,true), 
                    StructField("processed",BooleanType,true), 
                    StructField("rating",LongType,true), 
                    StructField("score",DoubleType,true), 
                    StructField("methodId",StringType,true)
                )
            )

 val schema = StructType(Array(
    StructField("requestId",StringType,true),
    StructField("siteName",StringType,true),
    StructField("model",StringType,true),
    StructField("inputs",
        ArrayType(input,true),
                true)
    )
)

val  records =sqlContext.read.schema(schema).json("s3://testData/test2.gz")

Not all the fields need to be provided. While it's good to provide all if possible.

Spark try best to parse all, if some row is not valid. It will add _corrupt_record as a column which contains the whole row. While if it's plained json file file.

Comments