gsamaras gsamaras - 2 months ago 30x
Python Question

Read a distributed Tab delimited CSV

Inspired from this question, I wrote some code to store an RDD (which was read from a Parquet file), with a Schema of (photo_id, data), in pairs, delimited by tabs, and just as a detail base 64 encode it, like this:

def do_pipeline(itr):
item_id = x.photo_id

def toTabCSVLine(data):
return '\t'.join(str(d) for d in data)

serialize_vec_b64pkl = lambda x: (x[0], base64.b64encode(cPickle.dumps(x[1])))

def format(data):
return toTabCSVLine(serialize_vec_b64pkl(data))

dataset ='mydir')
lines =

So now, the point of interest: How to read that dataset and print for example its deserialized data?

I am using Python 2.6.6.

My attempt lies here, where for just verifying that everything can be done, I wrote this code:

deserialize_vec_b64pkl = lambda x: (x[0], cPickle.loads(base64.b64decode(x[1])))

base64_dataset = sc.textFile('outdir')
collected_base64_dataset = base64_dataset.collect()

which calls collect(), which for testing is OK, but in a real-world scenario would struggle...


When I tried zero323's suggestion:

foo = (

I got this error, which boils down to this:

PythonRDD[2] at RDD at PythonRDD.scala:43
16/08/04 18:32:30 WARN TaskSetManager: Lost task 4.0 in stage 0.0 (TID 4, org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/grid/0/tmp/yarn-local/usercache/gsamaras/appcache/application_1470212406507_56888/container_e04_1470212406507_56888_01_000009/", line 98, in main
command = pickleSer._read_with_length(infile)
File "/grid/0/tmp/yarn-local/usercache/gsamaras/appcache/application_1470212406507_56888/container_e04_1470212406507_56888_01_000009/", line 164, in _read_with_length
return self.loads(obj)
File "/grid/0/tmp/yarn-local/usercache/gsamaras/appcache/application_1470212406507_56888/container_e04_1470212406507_56888_01_000009/", line 422, in loads
return pickle.loads(obj)
UnpicklingError: NEWOBJ class argument has NULL tp_new

at org.apache.spark.api.python.PythonRunner$$anon$
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.executor.Executor$
at java.util.concurrent.ThreadPoolExecutor.runWorker(
at java.util.concurrent.ThreadPoolExecutor$

16/08/04 18:32:30 ERROR TaskSetManager: Task 12 in stage 0.0 failed 4 times; aborting job
16/08/04 18:32:31 WARN TaskSetManager: Lost task 14.3 in stage 0.0 (TID 38, TaskKilled (killed intentionally)
16/08/04 18:32:31 WARN TaskSetManager: Lost task 13.3 in stage 0.0 (TID 39, TaskKilled (killed intentionally)
16/08/04 18:32:31 WARN TaskSetManager: Lost task 16.3 in stage 0.0 (TID 42, TaskKilled (killed intentionally)
Py4JJavaError Traceback (most recent call last)
/homes/gsamaras/code/ in <module>()
17 print(
---> 19 foo = (
20 print(foo)

/home/gs/spark/current/python/lib/ in collect(self)
769 """
770 with SCCallSiteSync(self.context) as css:
--> 771 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
772 return list(_load_from_socket(port, self._jrdd_deserializer))

/home/gs/spark/current/python/lib/ in __call__(self, *args)
811 answer = self.gateway_client.send_command(command)
812 return_value = get_return_value(
--> 813 answer, self.gateway_client, self.target_id,
815 for temp_arg in temp_args:

/home/gs/spark/current/python/lib/ in get_return_value(answer, gateway_client, target_id, name)
306 raise Py4JJavaError(
307 "An error occurred while calling {0}{1}{2}.\n".
--> 308 format(target_id, ".", name), value)
309 else:
310 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.


Let's try a simple example. For convenience I'll be using handy toolz library but it is not really required here.

import sys
import base64

if sys.version_info < (3, ):
    import cPickle as pickle
    import pickle

from toolz.functoolz import compose

rdd = sc.parallelize([(1, {"foo": "bar"}), (2, {"bar": "foo"})])

Now, your code is not exactly portable right now. In Python 2 base64.b64encode returns str, while in Python 3 it returns bytes. Lets illustrate that:

  • Python 2

    type(base64.b64encode(pickle.dumps({"foo": "bar"})))
    ## str
  • Python 3

    type(base64.b64encode(pickle.dumps({"foo": "bar"})))
    ## bytes

So lets add decoding to the pipeline:

# Equivalent to 
# def pickle_and_b64(x):
#     return base64.b64encode(pickle.dumps(x)).decode("ascii")

pickle_and_b64 = compose(
    lambda x: x.decode("ascii"),

Please note that this doesn't assume any particular shape of the data. Because of that, we'll use mapValues to serialize only keys:

serialized = rdd.mapValues(pickle_and_b64)
## 1, u'KGRwMApTJ2ZvbycKcDEKUydiYXInCnAyCnMu')

Now we can follow it with simple format and save:

from tempfile import mkdtemp
import os

outdir = os.path.join(mkdtemp(), "foo") x: "{0}\t{1}".format(*x)).saveAsTextFile(outdir)

To read the file we reverse the process:

# Equivalent to
# def  b64_and_unpickle(x):
#     return pickle.loads(base64.b64decode(x))

b64_and_unpickle = compose(

decoded = (sc.textFile(outdir)
    .map(lambda x: x.split("\t"))  # In Python 3 we could simply use str.split

## (u'1', {'foo': 'bar'})