Thoth Thoth - 2 months ago 10
Python Question

Can't pickle <class '__main__.JobQueueManager'>

I am encountering a picklability problem in this code (also attached below). I have read relevant posts [1] [2] but I can not find usefull correspodences. Could you please give an explanation or solution of this error?

Below these is the part of the code that returns the error:

pickle.PicklingError: Can't pickle <class '__main__.JobQueueManager'>: it's not found as __main__.JobQueueManager


Thanks!

def make_server_manager(port, authkey):

job_q = Queue.Queue()
result_q = Queue.Queue()

class JobQueueManager(SyncManager):
pass

JobQueueManager.register('get_job_q', callable=lambda: job_q)
JobQueueManager.register('get_result_q', callable=lambda: result_q)

manager = JobQueueManager(address=('', port), authkey=authkey)
manager.start()
print 'Server started at port %s' % port
return manager


PS: Python 2.7.7, Win 7

Answer

As best as I can tell, to make this pattern work on Windows, you need to create a picklable queue.Queue. You can do that by creating a child class of Queue that defines __setstate__ and __getstate__, and have it only pickle the pieces of state that we actually need to send between processes, and leave the other stuff (unpicklable internal locks) out.

The other changes we need to make are to move the custom Manager class definitions to the top-level, and to not use lambda functions as the argument to callable. Instead, we use a partial and a top-level function, because that can be pickled. Here's the final code:

import sys
from multiprocessing.managers import SyncManager
from functools import partial
import multiprocessing
from Queue import Queue as _Queue

class Queue(_Queue):
    """ A picklable queue. """   
    def __getstate__(self):
        # Only pickle the state we care about
        return (self.maxsize, self.queue, self.unfinished_tasks)

    def __setstate__(self, state):
        # Re-initialize the object, then overwrite the default state with
        # our pickled state.
        Queue.__init__(self)
        self.maxsize = state[0]
        self.queue = state[1]
        self.unfinished_tasks = state[2]


def get_q(q):
    return q

class JobQueueManager(SyncManager):
    pass


def make_server_manager(port, authkey):
    job_q = Queue()
    result_q = Queue()

    job_q.put("hey")
    JobQueueManager.register('get_job_q', callable=partial(get_q, job_q))
    JobQueueManager.register('get_result_q', callable=partial(get_q, result_q))

    manager = JobQueueManager(address=('', port), authkey=authkey)
    #manager.start()
    print('Server started at port %s' % port)
    return manager

def make_client_manager(port, authkey):
    JobQueueManager.register('get_job_q')
    JobQueueManager.register('get_result_q')
    manager = JobQueueManager(address=('localhost', port), authkey=authkey)
    manager.connect()
    queue = manager.get_job_q()
    print("got queue {}".format(queue))
    print(queue.get_nowait())

if __name__ == "__main__":
    if len(sys.argv) > 1 and sys.argv[1] == "--client":
        make_client_manager(50000, 'abcdefg')
    else:
        manager = make_server_manager(50000, "abcdefg")
        server = manager.get_server()
        server.serve_forever()