user3017842 user3017842 - 5 months ago 16
Python Question

passing python functions as objects to spark

I've been playing with Spark and Python on this online jupyter notebook https://tmpnb.org/ and tried 3 ways to pass python functions:

1) using map

import numpy as np
def my_sqrt(x):
return np.sqrt(x)

sc.parallelize(range(10)).map(my_sqrt).collect()


2) parallelizing my_sqrt and call it

sc.parallelize([(my_sqrt, i) for i in range(10)]).map(lambda x : x[0](x[1])).collect()


3) parallelizing np.sqrt and call it

sc.parallelize([(np.sqrt, i) for i in range(10)]).map(lambda x : x[0](x[1])).collect()


(1) and (3) do work and (2) doesn't. First I would like to understand why/how (1) and (3) work. Second, I would like to understand why (2) doesn't and what could be done to make it work.

Answer

The first approach works because Spark is using special serialization strategy to process closures required for transformations which is significantly slower but more powerful than the standard pickle (otherwise we couldn't use .map(lambda x: ...)).

The last approach works because there is no need to serialize function code at all. It references sqrt from numpy module so as long as NumPy is accessible on each worker there is no problem at all.

The second approach doesn't work because pickling doesn't serialize the code.

import pickle

pickle.dumps(my_sqrt)
## b'\x80\x03c__main__\nmy_sqrt\nq\x00.'

All it does it states please give me an object assigned to my_sqrt (my_sqrt.__name__) from the top-level script environment (a.k.a. __main__). When it is executed on the workers it doesn't use the same environment and there is no such object in the scope anymore, hence the exception. To be clear it is neither a bug or something specific to Spark. You can easily reproduce the same behavior locally as follows:

In [1]: import pickle

In [2]: def foo(): ...

In [3]: foo_ = pickle.dumps(foo)

In [4]: pickle.loads(foo_)
Out[4]: <function __main__.foo>

In [5]: del foo

In [6]: pickle.loads(foo_)
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
...

AttributeError: Can't get attribute 'foo' on <module '__main__'>

Since it doesn't concern itself with an actual value you can even reassign like this:

In [7]: foo = "foo"

In [8]: pickle.loads(foo_)
Out[8]: 'foo'

Take away message here is if you want to use a function this way put it in a separate module and distribute it among the workers the same way as you do with other dependencies, including custom classes definitions.

Comments