Alex Elkman Alex Elkman - 3 months ago 23
Python Question

Refill multiprocessing Queue when empty?

I'm trying to make a multiprocessing Queue in Python 2.7 that fills up to it's maxsize with processes, and then while there are more processes to be done that haven't yet been put into the Queue, will refill the Queue when any of the current procs finish. I'm trying to maximize performance so size of the Queue is numCores on the PC so each core is always doing work (ideally CPU will be at 100% use the whole time). I'm also trying to avoid context switching which is why I only want this many in the Queue at any time.

Example would be, say there are 50 tasks to be done, the CPU has 4 cores, so the Queue will be maxsize 4. We start by filling Queue with 4 processes, and immediately upon any of those 4 finishing (at which time there will be 3 in the Queue), a new proc is generated and sent to the queue. It continues doing this until all 50 tasks have been generated and completed.

This task is proving to be difficult since I'm new to multiprocessing, and also it seems the join() function will not work for me since that forces a blocking statement until ALL of the procs in the Queue have completed, which is NOT what I want.

Here is my code right now:

def queuePut(q, thread):
q.put(thread)


def launchThreads(threadList, performanceTestList, resultsPath, cofluentExeName):
numThreads = len(threadList)
threadsLeft = numThreads
print "numThreads: " + str(numThreads)
cpuCount = multiprocessing.cpu_count()
q = multiprocessing.Queue(maxsize=cpuCount)
count = 0
while count != numThreads:
while not q.full():
thread = threadList[numThreads - threadsLeft]
p = multiprocessing.Process(target=queuePut, args=(q,thread))
print "Starting thread " + str(numThreads - threadsLeft)
p.start()
threadsLeft-=1
count +=1
if(threadsLeft == 0):
threadsLeft+=1
break


Here is where it gets called in code:

for i in testNames:
p = multiprocessing.Process(target=worker,args=(i,paths[0],cofluentExeName,))
jobs.append(p)

launchThreads(jobs, testNames, testDirectory, cofluentExeName)


The procs seem to get created and put into the queue, for an example where there are 12 tasks and 40 cores, the output is as follows, proceeded by the error below:

numThreads: 12
Starting thread 0
Starting thread 1
Starting thread 2
Starting thread 3
Starting thread 4
Starting thread 5
Starting thread 6
Starting thread 7
Starting thread 8
Starting thread 9
Starting thread 10
Starting thread 11

File "C:\Python27\lib\multiprocessing\queues.py", line 262, in _feed
send(obj)
File "C:\Python27\lib\multiprocessing\process.py", line 290, in __reduce__
'Pickling an AuthenticationString object is '
TypeError: Pickling an AuthenticationString object is disallowed for security re
asons
Traceback (most recent call last):
File "C:\Python27\lib\multiprocessing\queues.py", line 262, in _feed
send(obj)
File "C:\Python27\lib\multiprocessing\process.py", line 290, in __reduce__
'Pickling an AuthenticationString object is '
TTypeError: Pickling an AuthenticationString object is disallowed for security r
easons
raceback (most recent call last):
File "C:\Python27\lib\multiprocessing\queues.py", line 262, in _feed
send(obj)
File "C:\Python27\lib\multiprocessing\process.py", line 290, in __reduce__
'Pickling an AuthenticationString object is '
TTypeError: Pickling an AuthenticationString object is disallowed for security r
easons
raceback (most recent call last):
File "C:\Python27\lib\multiprocessing\queues.py", line 262, in _feed
send(obj)
File "C:\Python27\lib\multiprocessing\process.py", line 290, in __reduce__
'Pickling an AuthenticationString object is '
TypeError: Pickling an AuthenticationString object is disallowed for security re
asons

Answer

Why don't you use a multiprocessing Pool to accomplish this?

import multiprocessing
pool = multiprocessing.Pool()
pool.map(your_function, dataset) ##dataset is a list; could be other iterable object
pool.close()
pool.join()

The multiprocessing.Pool() can have the argument processes=# where you specify the # of jobs you want to start. If you don't specify this parameter, it will start as many jobs as you have cores (so if you have 4 cores, 4 jobs). When one job finishes it'll automatically start the next one; you don't have to manage that.

Multiprocessing: https://docs.python.org/2/library/multiprocessing.html

Comments