Kris Kris - 6 months ago 49
Python Question

Multiprocessing: Passing a class instance to pool.map

I swear I saw the following in an example somewhere, but now I can't find that example and this isn't working. The __call__ class function never gets called.

EDIT: Code updated

pool.map appears to start the QueueWriter instance and the __call__ function is reached. However, the workers never seem to start or at least no results are pulled from the queue. Is my queue set up the right way? Why do the workers not fire off?

import multiprocessing as mp
import os
import random

class QueueWriter(object):
def __init__(self, **kwargs):
self.grid = kwargs.get("grid")
self.path = kwargs.get("path")

def __call__(self, q):
print self.path
log = open(self.path, "a", 1)
log.write("QueueWriter called.\n")
while 1:
res = q.get()
if res == 'kill':
self.log.write("QueueWriter received 'kill' message. Closing Writer.\n")
break
else:
self.log.write("This is where I'd write: {0} to grid file.\n".format(res))

log.close()
log = None

class Worker(object):
def __init__(self, **kwargs):
self.queue = kwargs.get("queue")
self.grid = kwargs.get("grid")

def __call__(self, idx):
res = self.workhorse(self, idx)
self.queue.put((idx,res))
return res

def workhorse(self,idx):
#in reality a fairly complex operation
return self.grid[idx] ** self.grid[idx]


if __name__ == '__main__':
# log = open(os.path.expanduser('~/minimal.log'), 'w',1)
path = os.path.expanduser('~/minimal.log')

pool = mp.Pool(mp.cpu_count())
manager = mp.Manager()
q = manager.Queue()

grid = [random.random() for _ in xrange(10000)]
# in actuality grid is a shared resource, read by Workers and written
# to by QueueWriter

qWriter = QueueWriter(grid=grid, path=path)
watcher = pool.map(qWriter, (q,),1)
wrkr = Worker(queue=q,grid=grid)
result = pool.map(wrkr, range(10000), 1)
result.get()
q.put('kill')
pool.close()
pool.join()


So the log does indeed print the initialization message, but then __call__ function is never called. Is this one of those pickling issues I've seen discussed so often? I've found answers about class member functions, but what about class instances?

Answer

At the gentle and patient prodding of martineau (thanks!) I think I've ironed out the problems. I have yet to apply it to my original code, but it is working in the example above and I'll start new questions for future implementation problems.

So in addition to changing where in the code the target file (the log, in this example) gets opened, I also started the QueueWriter instance as a single multiprocessing process rather than using pool.map. As martineau pointed out the map call blocks until the qWriter.__call__() returns and this prevented the workers from being called.

There were some other bugs in the code above, but those were incidental and fixed below:

import multiprocessing as mp
import os
import random

class QueueWriter(object):
    def __init__(self, **kwargs): 
        self.grid = kwargs.get("grid")
        self.path = kwargs.get("path")

    def __call__(self, q):
        print self.path
        log = open(self.path, "a", 1)
        log.write("QueueWriter called.\n")    
        while 1:
            res = q.get()
            if res == 'kill':
                log.write("QueueWriter received 'kill' message. Closing Writer.\n")
                break
            else:
                log.write("This is where I'd write: {0} to grid file.\n".format(res))

        log.close()
        log = None

class Worker(object):
    def __init__(self, **kwargs):
        self.queue = kwargs.get("queue")
        self.grid = kwargs.get("grid")

    def __call__(self, idx):
        res = self.workhorse(idx)
        self.queue.put((idx,res))
        return res

    def workhorse(self,idx):
        #in reality a fairly complex operation
        return self.grid[idx] ** self.grid[idx]


if __name__ == '__main__':
#     log = open(os.path.expanduser('~/minimal.log'), 'w',1)
    path = os.path.expanduser('~/minimal.log')

    pool = mp.Pool(mp.cpu_count())
    manager = mp.Manager()
    q = manager.Queue()

    grid = [random.random() for _ in xrange(10000)] 
    # in actuality grid is a shared resource, read by Workers and written
    # to by QueueWriter

    qWriter = QueueWriter(grid=grid, path=path)
#     watcher = pool.map(qWriter, (q,),1)
# Start the writer as a single process rather than a pool
    p = mp.Process(target=qWriter, args=(q,))
    p.start()
    wrkr = Worker(queue=q,grid=grid)
    result = pool.map(wrkr, range(10000), 1)
#     result.get()
# not required for pool
    q.put('kill')
    pool.close()
    p.join()
    pool.join()