Johnnyboycurtis Johnnyboycurtis - 3 months ago 22
Python Question

PySpark distributing module imports

Over the past few days I've been working on trying to understand how Spark executors know how to use a module by a given name upon import. I am working on AWS EMR. Situation:
I initialize pyspark on EMR by typing

pyspark --master yarn

Then, in pyspark,

import numpy as np ## notice the naming

def myfun(x):
n = np.random.rand(1)
return x*n

rdd = sc.parallelize([1,2,3,4], 2)
rdd.map(lambda x: myfun(x)).collect() ## works!


My understanding is that when I import
numpy as np
, the master node is the only node importing and identifying
numpy
through
np
. However, with an EMR cluster (2 worker nodes), if I run the map function on the rdd, the driver program sends the function to the worker nodes to execute the function for each item in the list (for each partition), and a successful result is returned.

My question is this: How do the workers know that numpy should be imported as np? Each worker has numpy already installed, but I've not defined explicitly defined a way for each node to import the module
as np
.

Please refer to the following post by Cloudera for further details on dependencies:
http://blog.cloudera.com/blog/2015/09/how-to-prepare-your-apache-hadoop-cluster-for-pyspark-jobs/

Under Complex Dependency they have an example (code) where the pandas module is explicitly imported on each node.

One theory that I've heard being thrown around is that the driver program distributes all code passed in the pyspark interactive shell. I am skeptical of this. The example I bring up to counter this idea is, if on the master node I type:

print "hello"


is every worker node also printing "hello"? I don't think so. But maybe I am wrong on this.

Answer

When function is serialized there is a number of objects is being saved:

  • code
  • globals
  • defaults
  • closure
  • dict

which can be later used to restore complete environment required for a given function.

Since np is referenced by the function it can be extracted from its code:

from pyspark.cloudpickle import CloudPickler

CloudPickler.extract_code_globals(myfun.__code__)
## {'np'}

and binding can be extracted from its globals:

myfun.__globals__['np']
## <module 'numpy' from ...

So serialized closure (in a broad sense) captures all information required to restore environment. Of course all modules accessed in the closure have to be importable on every worker machine.

Everything else is just reading and writing machinery.

On a side note master node shouldn't execute any Python code. It is responsible for resources allocation not running application code.

Comments