I'm writing a machine learning program with the following components:
Based on the comments, what you're really looking for is a way to update a shared object from a set of processes that are carrying out a CPU-bound task. The CPU-bounding makes
multiprocessing an obvious choice - if most of your work was IO-bound,
multithreading would have been a simpler choice.
Your problem follows a simpler server-client model: the clients use the server as a simple stateful store, no communication between any child processes is needed, and no process needs to be synchronised.
Thus, the simplest way to do this is to:
From the server's perspective, the identity of the clients doesn't matter - only their actions do.
Thus, this can be accomplished by using a customised manager in
multiprocessing as so:
# server.py from multiprocessing.managers import BaseManager # this represents the data structure you've already implemented. from ... import ExperienceTree # An important note: the way proxy objects work is by shared weak reference to # the object. If all of your workers die, it takes your proxy object with # it. Thus, if you have an instance, the instance is garbage-collected # once all references to it have been erased. I have chosen to sidestep # this in my code by using class variables and objects so that instances # are never used - you may define __init__, etc. if you so wish, but # just be aware of what will happen to your object once all workers are gone. class ExperiencePool(object): tree = ExperienceTree() @classmethod def update(cls, experience_object): ''' Implement methods to update the tree with an experience object. ''' cls.tree.update(experience_object) @classmethod def sample(cls): ''' Implement methods to sample the tree's experience objects. ''' return cls.tree.sample() # subclass base manager class Server(BaseManager): pass # register the class you just created - now you can access an instance of # ExperiencePool using Server.Shared_Experience_Pool(). Server.register('Shared_Experience_Pool', ExperiencePool) if __name__ == '__main__': # run the server on port 8080 of your own machine with Server(('localhost', 8080), authkey=b'none') as server_process: server_process.get_server().serve_forever()
Now for all of your clients you can just do:
# client.py - you can always have a separate client file for a learner and a simulator. from multiprocessing.managers import BaseManager from server import ExperiencePool class Server(BaseManager): pass Server.register('Shared_Experience_Pool', ExperiencePool) if __name__ == '__main__': # run the server on port 8080 of your own machine forever. server_process = Server(('localhost', 8080), authkey=b'none') server_process.connect() experience_pool = server_process.Shared_Experience_Pool() # now do your own thing and call `experience_call.sample()` or `update` whenever you want.
You may then launch one
server.py and as many
workers as you want.
Not always. You may run into race conditions in that your learners may receive stale or old data if they are forced to compete with a simulator node writing at the same time.
If you want to ensure a preference for latest writes, you may additionally use a lock whenever your simulators are trying to write something, preventing your other processes from getting a read until the write finishes.