Woody1193 Woody1193 - 4 months ago 13
Python Question

Multiprocessing, what does pool.ready do?

Suppose I have a pool with a few processes inside of a class that I use to do some processing, like this:

class MyClass:

def __init_(self):
self.pool = Pool(processes = NUM_PROCESSES)
self.pop = []
self.finished = []

def gen_pop(self):
self.pop = [ self.pool.apply_async(Item.test, (Item(),)) for _ in range(NUM_PROCESSES) ]
while (not self.check()):
continue
# Do some other stuff

def check(self):
self.finished = filter(lambda t: self.pop[t].ready(), range(NUM_PROCESSES))
new_pop = []
for f in self.finished:
new_pop.append(self.pop[f].get(timeout = 1))
self.pop[f] = None
# Do some other stuff


When I run this code I get a
cPickle.PicklingError
which states that a
<type 'function'>
can't be pickled. What this tells me is that one of the
apply_async
functions has not returned yet so I am attempting to append a running function to another list. But this shouldn't be happening because all running calls should have been filtered out using the
ready()
function.

On a related note, the actual nature of the
Item
class is unimportant but what is important is that at the top of my
Item.test
function I have a print statement which is supposed to fire for debugging purposes. However, that does not occur. This tells me that that the function has been initiated but has not actually started execution.

So then, it appears that
ready()
does not actually tell me whether or not a call has finished execution or not. What exactly does
ready()
do and how should I edit my code so that I can filter out the processes that are still running?

Answer

Multiprocessing uses pickle module internally to pass data between processes, so your data must be pickable. See the list of what is considered pickable, object method is not in that list.
To solve this quickly just use a wrapper function around the method:

def wrap_item_test(item):
    item.test()

class MyClass:
    def gen_pop(self):
        self.pop = [ self.pool.apply_async(wrap_item_test, (Item(),)) for _ in range(NUM_PROCESSES) ]
        while (not self.check()):
            continue