Cleb - 1 year ago 159
Python Question

# Multiprocessing in Python: how to implement a loop over "apply_async" as "map_async" using a callback function

I would like to integrate a system of differential equations for several parameter combinations using Python's multiprocessing module. So, the system should get integrated and the parameter combination should be stored as well as its index and the final value of one of the variables.

While that works fine when I use

apply_async
- which is already faster than doing it in a simple for-loop - I fail to implement the same thing using
map_async
which seems to be faster than
apply_async
. The callback function is never called and I have no clue why. Could anyone explain why this happens and how to get the same output using
map_async
apply_async
?!

Here is my code:

from pylab import *
import multiprocessing as mp
from scipy.integrate import odeint
import time

#my system of differential equations
def myODE (yn,tvec,allpara):

(x, y, z) = yn

a, b = allpara['para']

dx = -x + a*y + x*x*y
dy = b - a*y - x*x*y
dz = x*y

return (dx, dy, dz)

#returns the index of the parameter combination, the parameters and the integrated solution
#this way I know which parameter combination belongs to which outcome in the asynch-case
def runMyODE(yn,tvec,allpara):
return allpara['index'],allpara['para'],transpose(odeint(myODE, yn, tvec, args=(allpara,)))

#for reproducibility
seed(0)

#time settings for integration
dt = 0.01
tmax = 50
tval = arange(0,tmax,dt)

numVar = 3 #number of variables (x, y, z)
numPar = 2 #number of parameters (a, b)
numComb = 5 #number of parameter combinations

INIT = zeros((numComb,numVar)) #initial conditions will be stored here
PARA = zeros((numComb,numPar)) #parameter combinations for a and b will be stored here

#create some initial conditions and random parameters
for combi in range(numComb):

INIT[combi,:] = append(10*rand(2),0) #initial conditions for x and y are randomly chosen, z is 0

PARA[combi,:] = 10*rand(2) #parameter a and b are chosen randomly

#################################using loop over apply####################

#results will be stored in here
asyncResultsApply = []

#my callback function
def saveResultApply(result):
# storing the index, a, b and the final value of z
asyncResultsApply.append((result[0], result[1], result[2][2,-1]))

#start the multiprocessing part
pool = mp.Pool(processes=4)
for combi in range(numComb):
pool.apply_async(runMyODE, args=(INIT[combi,:],tval,{'para': PARA[combi,:], 'index': combi}), callback=saveResultApply)
pool.close()
pool.join()

for res in asyncResultsApply:
print res[0], res[1], res[2] #printing the index, a, b and the final value of z

#######################################using map#####################
#the only difference is that the for loop is replaced by a "map_async" call
print "\n\nnow using map\n\n"
asyncResultsMap = []

#my callback function which is never called
def saveResultMap(result):
# storing the index, a, b and the final value of z
asyncResultsMap.append((result[0], result[1], result[2][2,-1]))

pool = mp.Pool(processes=4)
pool.map_async(lambda combi: runMyODE(INIT[combi,:], tval, {'para': PARA[combi,:], 'index': combi}), range(numComb), callback=saveResultMap)
pool.close()
pool.join()

#this does not work yet
for res in asyncResultsMap:
print res[0], res[1], res[2] #printing the index, a, b and the final value of z

If I understood you correctly, it stems from something that confuses people quite often. apply_async's callback is called after the single op, but so does map's - it does not call the callback on each element, but rather once on the entire result.

You are correct in noting that map is faster than apply_asyncs. If you want something to happen after each result, there are a few ways to go:

1. You can effectively add the callback to the operation you want to be performed on each element, and map using that.

2. You could use imap (or imap_unordered) in a loop, and do the callback within the loop body. Of course, this means that all will be performed in the parent process, but the nature of stuff written as callbacks means that's usually not a problem (it tends to be cheap functions). YMMV.

For example, suppose you have the functions f and cb, and you'd like to map f on es with cb for each op. Then you could either do:

def look_ma_no_cb(e):
r = f(e)
cb(r)
return r

p = multiprocessing.Pool()
p.map(look_ma_no_cb, es)

or

for r in p.imap(f, es):
cb(r)
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download