CMCDragonkai CMCDragonkai - 1 month ago 13
Python Question

Python 3, why does only functions and partials work in multiprocessing apply_async, but neither closures nor lambdas

I was trying to use use some closures in my multiprocessing code and it kept failing for no reason. So I did a little test:

#!/usr/bin/env python3

import functools
from multiprocessing import Pool

def processing_function(unprocessed_data):
return unprocessed_data

def callback_function(processed_data):
print("FUNCTION: " + str(processed_data))

def create_processing_closure(initial_data):
def processing_function(unprocessed_data):
return initial_data + unprocessed_data
return processing_function

def create_callback_closure():
def callback(processed_data):
print("CLOSURE: " + str(processed_data))
return callback

def create_processing_lambda(initial_data):
return lambda unprocessed_data: initial_data + unprocessed_data

def create_callback_lambda():
return lambda processed_data: print("LAMBDA: " + str(processed_data))

def processing_partial(unprocessed_data1, unprocessed_data2):
return (unprocessed_data1 + unprocessed_data2)

def callback_partial(initial_data, processed_data):
print("PARTIAL: " + str(processed_data))

pool = Pool(processes=1)

print("Testing if they work normally...")

f1 = processing_function
f2 = callback_function

f2(f1(1))

f3 = create_processing_closure(1)
f4 = create_callback_closure()

f4(f3(1))

f5 = create_processing_lambda(1)
f6 = create_callback_lambda()

f6(f5(1))

f7 = functools.partial(processing_partial, 1)
f8 = functools.partial(callback_partial, 1)

f8(f7(1))

# bonus round!
x = 1
f9 = lambda unprocessed_data: unprocessed_data + x
f10 = lambda processed_data: print("GLOBAL LAMBDA: " + str(processed_data))

f10(f9(1))

print("Testing if they work in apply_async...")

# works
pool.apply_async(f1, args=(1,), callback=f2)
# doesn't work
pool.apply_async(f3, args=(1,), callback=f4)
# doesn't work
pool.apply_async(f5, args=(1,), callback=f6)
# works
pool.apply_async(f7, args=(1,), callback=f8)
# doesn't work
pool.apply_async(f9, args=(1,), callback=f10)

pool.close()
pool.join()


The results are:

> ./apply_async.py
Testing if they work normally...
FUNCTION: 1
CLOSURE: 2
LAMBDA: 2
PARTIAL: 2
GLOBAL LAMBDA: 2
Testing if they work in apply_async...
FUNCTION: 1
PARTIAL: 2


Can anyone explain this weird behavior?

Answer

Because those objects can't be transferred to another process; pickling of callables only ever stores the module and name, not the object itself.

The partial only works because it shares the underlying function object, which here is another global.

See the What can be pickled and unpickled section of the pickle module documentation:

  • functions defined at the top level of a module (using def, not lambda)
  • built-in functions defined at the top level of a module

[...]

Note that functions (built-in and user-defined) are pickled by “fully qualified” name reference, not by value. [2] This means that only the function name is pickled, along with the name of the module the function is defined in. Neither the function’s code, nor any of its function attributes are pickled. Thus the defining module must be importable in the unpickling environment, and the module must contain the named object, otherwise an exception will be raised. [3]

Do note the multiprocessing Programming guidelines:

Picklability

Ensure that the arguments to the methods of proxies are picklable.

and

Better to inherit than pickle/unpickle

When using the spawn or forkserver start methods many types from multiprocessing need to be picklable so that child processes can use them. However, one should generally avoid sending shared objects to other processes using pipes or queues. Instead you should arrange the program so that a process which needs access to a shared resource created elsewhere can inherit it from an ancestor process.

If you try to pickle each of your callable objects directly, you can see that which can be pickled happen to coincide with what callables successfully were executed using multiprocessing:

>>> import pickle
>>> f2(f1(1))
FUNCTION: 1
>>> pickle.dumps([f1, f2]) is not None
True
>>> f4(f3(1))
CLOSURE: 2
>>> pickle.dumps([f3, f4]) is not None
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: Can't pickle local object 'create_processing_closure.<locals>.processing_function'
>>> f6(f5(1))
LAMBDA: 2
>>> pickle.dumps([f5, f6]) is not None
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: Can't pickle local object 'create_processing_lambda.<locals>.<lambda>'
>>> f8(f7(1))
PARTIAL: 2
>>> pickle.dumps([f7, f8]) is not None
True
>>> f10(f9(1))
GLOBAL LAMBDA: 2
>>> pickle.dumps([f9, f10]) is not None
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
_pickle.PicklingError: Can't pickle <function <lambda> at 0x10994e8c8>: attribute lookup <lambda> on __main__ failed