dp0377 dp0377 - 1 year ago 239
Python Question

pyspark: Save schemaRDD as json file

I am looking for a way to export data from Apache Spark to various other tools in JSON format. I presume there must be a really straightforward way to do it.

Example: I have the following JSON file 'jfile.json':

{"key":value_a1, "key2":value_b1},
{"key":value_a2, "key2":value_b2},

where each line of the file is a JSON object. These kind of files can be easily read into PySpark with

jsonRDD = jsonFile('jfile.json')

and then look like (by calling jsonRDD.collect()):

[Row(key=value_a1, key2=value_b1),Row(key=value_a2, key2=value_b2)]

Now I want to save these kind of files back to a pure JSON file.

I found this entry on the Spark User list:


that claimed using


After doing this, the text file looks like

Row(key=value_a1, key2=value_b1)
Row(key=value_a2, key2=value_b2)

, i.e., the jsonRDD has just been plainly written to the file. I would have expected a kind of an "automagic" conversion back to JSON format after reading the Spark User List entry. My goal is to have a file that looks like 'jfile.json' mentioned in the beginning.

Am I missing a really obvious easy way to do this?

I read http://spark.apache.org/docs/latest/programming-guide.html, searched google, the user list and stack overflow for answers, but almost all answers deal with reading and parsing JSON into Spark. I even bought the book 'Learning Spark', but the examples there (p. 71) just lead to the same output file as above.

Can anybody help me out here? I feel like I am missing just a small link in here

Cheers and thanks in advance!

Answer Source

I can't see an easy way to do it. One solution is to convert each element of the SchemaRDD to a String, ending up with an RDD[String] where each of the elements is formatted JSON for that row. So, you need to write your own JSON serializer. That's the easy part. It may not be super fast but it should work in parallel, and you already know how to save an RDD to a text file.

The key insight is that you can get a representation of the schema out of the SchemaRDD by calling the schema method. Then each Row handed to you by map needs to be traversed recursively in conjunction with the schema. This is actually an in-tandem list traversal for flat JSON, but you may also need to consider nested JSON.

The rest is just a small matter of Python, which I don't speak, but I do have this working in Scala in case it helps you. The parts where the Scala code gets dense actually don't depend on deep Spark knowledge so if you can understand the basic recursion and know Python you should be able to make it work. The bulk of the work for you is figuring out how to work with a pyspark.sql.Row and a pyspark.sql.StructType in the Python API.

One word of caution: I'm pretty sure my code doesn't yet work in the case of missing values -- the formatItem method needs to handle null elements.

Edit: In Spark 1.2.0 the toJSON method was introduced to SchemaRDD, making this a much simpler problem -- see the answer by @jegordon.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download