Cleb - 1 year ago 140

Python Question

I would like to integrate a system of differential equations for several parameter combinations using Python's multiprocessing module. So, the system should get integrated and the parameter combination should be stored as well as its index and the final value of one of the variables.

While that works fine when I use

`apply_async`

`map_async`

`apply_async`

`map_async`

`apply_async`

Here is my code:

`from pylab import *`

import multiprocessing as mp

from scipy.integrate import odeint

import time

#my system of differential equations

def myODE (yn,tvec,allpara):

(x, y, z) = yn

a, b = allpara['para']

dx = -x + a*y + x*x*y

dy = b - a*y - x*x*y

dz = x*y

return (dx, dy, dz)

#returns the index of the parameter combination, the parameters and the integrated solution

#this way I know which parameter combination belongs to which outcome in the asynch-case

def runMyODE(yn,tvec,allpara):

return allpara['index'],allpara['para'],transpose(odeint(myODE, yn, tvec, args=(allpara,)))

#for reproducibility

seed(0)

#time settings for integration

dt = 0.01

tmax = 50

tval = arange(0,tmax,dt)

numVar = 3 #number of variables (x, y, z)

numPar = 2 #number of parameters (a, b)

numComb = 5 #number of parameter combinations

INIT = zeros((numComb,numVar)) #initial conditions will be stored here

PARA = zeros((numComb,numPar)) #parameter combinations for a and b will be stored here

#create some initial conditions and random parameters

for combi in range(numComb):

INIT[combi,:] = append(10*rand(2),0) #initial conditions for x and y are randomly chosen, z is 0

PARA[combi,:] = 10*rand(2) #parameter a and b are chosen randomly

#################################using loop over apply####################

#results will be stored in here

asyncResultsApply = []

#my callback function

def saveResultApply(result):

# storing the index, a, b and the final value of z

asyncResultsApply.append((result[0], result[1], result[2][2,-1]))

#start the multiprocessing part

pool = mp.Pool(processes=4)

for combi in range(numComb):

pool.apply_async(runMyODE, args=(INIT[combi,:],tval,{'para': PARA[combi,:], 'index': combi}), callback=saveResultApply)

pool.close()

pool.join()

for res in asyncResultsApply:

print res[0], res[1], res[2] #printing the index, a, b and the final value of z

#######################################using map#####################

#the only difference is that the for loop is replaced by a "map_async" call

print "\n\nnow using map\n\n"

asyncResultsMap = []

#my callback function which is never called

def saveResultMap(result):

# storing the index, a, b and the final value of z

asyncResultsMap.append((result[0], result[1], result[2][2,-1]))

pool = mp.Pool(processes=4)

pool.map_async(lambda combi: runMyODE(INIT[combi,:], tval, {'para': PARA[combi,:], 'index': combi}), range(numComb), callback=saveResultMap)

pool.close()

pool.join()

#this does not work yet

for res in asyncResultsMap:

print res[0], res[1], res[2] #printing the index, a, b and the final value of z

Recommended for you: Get network issues from **WhatsUp Gold**. **Not end users.**

Answer Source

If I understood you correctly, it stems from something that confuses people quite often. `apply_async`

's callback is called after the single op, but so does `map`

's - it does not call the callback on each element, but rather once on the entire result.

You are correct in noting that `map`

is faster than `apply_async`

s. If you want something to happen after each result, there are a few ways to go:

You can effectively add the callback to the operation you want to be performed on each element, and

`map`

using that.You could use

`imap`

(or`imap_unordered`

) in a loop, and do the callback within the loop body. Of course, this means that all will be performed in the parent process, but the nature of stuff written as callbacks means that's usually not a problem (it tends to be cheap functions). YMMV.

For example, suppose you have the functions `f`

and `cb`

, and you'd like to `map`

`f`

on `es`

with `cb`

for each op. Then you could either do:

```
def look_ma_no_cb(e):
r = f(e)
cb(r)
return r
p = multiprocessing.Pool()
p.map(look_ma_no_cb, es)
```

or

```
for r in p.imap(f, es):
cb(r)
```

Recommended from our users: **Dynamic Network Monitoring from WhatsUp Gold from IPSwitch**. ** Free Download**