Han-Kwang Nienhuys Han-Kwang Nienhuys - 6 months ago 180
Python Question

Minimize overhead in Python multiprocessing.Pool with numpy/scipy

I've spent several hours on different attempts to parallelize my number-crunching code, but it only gets slower when I do so. Unfortunately, the problem disappears when I try to reduce it to the example below and I don't really want to post the whole program here. So the question is: what pitfalls should I avoid in this type of program?

(Note: follow-up after Unutbu's answer is at the bottom.)

Here are the circumstances:


  • It's about a module that defines a class
    BigData
    with a lot of internal data. In the example there is one list
    ff
    of interpolation functions; in the actual program, there are more, e.g.,
    ffA[k]
    ,
    ffB[k]
    ,
    ffC[k]
    .

  • The calculation would be classified as "embarrassingly parallel": the work can be done on smaller chunks of data at a time. In the example, that's
    do_chunk()
    .

  • The approach shown in the example would result, in my actual program, in the worst performance: about 1 second per chunk (on top of 0.1 second or so of actual calculation time when done in a single thread). So, for n=50,
    do_single()
    would run in 5 seconds and
    do_multi()
    would run in 55 seconds.

  • I also tried to split up the work by slicing the
    xi
    and
    yi
    arrays into contiguous blocks and iterating over all
    k
    values in each chunk. That worked a bit better. Now there was no difference in total execution time whether I used 1, 2, 3, or 4 threads. But of course, I want to see an actual speedup!

  • This may be related: Multiprocessing.Pool makes Numpy matrix multiplication slower. However, elsewhere in the program, I used a multiprocessing pool for calculations that were much more isolated: a function (not bound to a class) that looks something like
    def do_chunk(array1, array2, array3)
    and does numpy-only calculations on that array. There, there was a significant speed boost.

  • The CPU usage scales with the number of parallel processes as expected (300% CPU usage for three threads).





#!/usr/bin/python2.7

import numpy as np, time, sys
from multiprocessing import Pool
from scipy.interpolate import RectBivariateSpline

_tm=0
def stopwatch(msg=''):
tm = time.time()
global _tm
if _tm==0: _tm = tm; return
print("%s: %.2f seconds" % (msg, tm-_tm))
_tm = tm

class BigData:
def __init__(self, n):
z = np.random.uniform(size=n*n*n).reshape((n,n,n))
self.ff = []
for i in range(n):
f = RectBivariateSpline(np.arange(n), np.arange(n), z[i], kx=1, ky=1)
self.ff.append(f)
self.n = n

def do_chunk(self, k, xi, yi):
s = np.sum(np.exp(self.ff[k].ev(xi, yi)))
sys.stderr.write(".")
return s

def do_multi(self, numproc, xi, yi):
procs = []
pool = Pool(numproc)
stopwatch('Pool setup')
for k in range(self.n):
p = pool.apply_async( _do_chunk_wrapper, (self, k, xi, yi))
procs.append(p)
stopwatch('Jobs queued (%d processes)' % numproc)
sum = 0.0
for k in range(self.n):
# Edit/bugfix: replaced p.get by procs[k].get
sum += np.sum(procs[k].get(timeout=30)) # timeout allows ctrl-C interrupt
if k == 0: stopwatch("\nFirst get() done")
stopwatch('Jobs done')
pool.close()
pool.join()
return sum

def do_single(self, xi, yi):
sum = 0.0
for k in range(self.n):
sum += self.do_chunk(k, xi, yi)
stopwatch('\nAll in single process')
return sum

def _do_chunk_wrapper(bd, k, xi, yi): # must be outside class for apply_async to chunk
return bd.do_chunk(k, xi, yi)

if __name__ == "__main__":
stopwatch()
n = 50
bd = BigData(n)
m = 1000*1000
xi, yi = np.random.uniform(0, n, size=m*2).reshape((2,m))
stopwatch('Initialized')
bd.do_multi(2, xi, yi)
bd.do_multi(3, xi, yi)
bd.do_single(xi, yi)


The output:


Initialized: 0.06 seconds
Pool setup: 0.01 seconds
Jobs queued (2 processes): 0.03 seconds
..
First get() done: 0.34 seconds
................................................Jobs done: 7.89 seconds
Pool setup: 0.05 seconds
Jobs queued (3 processes): 0.03 seconds
..
First get() done: 0.50 seconds
................................................Jobs done: 6.19 seconds
..................................................
All in single process: 11.41 seconds


Timings are on an Intel Core i3-3227 CPU with 2 cores, 4 threads, running 64-bit Linux. For the actual program, the multi-processing version (pool mechanism, even if using only one core) was a factor 10 slower than the single-process version.

Follow-up

Unutbu's answer got me on the right track. In the actual program,
self
was pickled into a 37 to 140 MB object that needed to be passed to the worker processes. Worse, Python pickling is very slow; the pickling itself took a few seconds, which happened for each chunk of work passed to the worker processes. Other than pickling and passing big data objects, the overhead of
apply_async
in Linux is very small; for a small function (adding a few integer arguments), it takes only 0.2 ms per
apply_async
/
get
pair. So, splitting up the work in very small chunks is not a problem by itself. So, I transmit all big array arguments as indices to global variables. I keep the chunk size small for the purpose of CPU cache optimization.

The global variables are stored in a global
dict
; the entries are immediately deleted in the parent process after the worker pool is set up. Only the keys to the
dict
are transmitted to the worker procesess. The only big data for pickling/IPC is the new data that is created by the workers.



#!/usr/bin/python2.7

import numpy as np, sys
from multiprocessing import Pool

_mproc_data = {} # global storage for objects during multiprocessing.

class BigData:
def __init__(self, size):
self.blah = np.random.uniform(0, 1, size=size)

def do_chunk(self, k, xi, yi):
# do the work and return an array of the same shape as xi, yi
zi = k*np.ones_like(xi)
return zi

def do_all_work(self, xi, yi, num_proc):
global _mproc_data
mp_key = str(id(self))
_mproc_data['bd'+mp_key] = self # BigData
_mproc_data['xi'+mp_key] = xi
_mproc_data['yi'+mp_key] = yi
pool = Pool(processes=num_proc)
# processes have now inherited the global variabele; clean up in the parent process
for v in ['bd', 'xi', 'yi']:
del _mproc_data[v+mp_key]

# setup indices for the worker processes (placeholder)
n_chunks = 45
n = len(xi)
chunk_len = n//n_chunks
i1list = np.arange(0,n,chunk_len)
i2list = i1list + chunk_len
i2list[-1] = n
klist = range(n_chunks) # placeholder

procs = []
for i in range(n_chunks):
p = pool.apply_async( _do_chunk_wrapper, (mp_key, i1list[i], i2list[i], klist[i]) )
sys.stderr.write(".")
procs.append(p)
sys.stderr.write("\n")

# allocate space for combined results
zi = np.zeros_like(xi)

# get data from workers and finish
for i, p in enumerate(procs):
zi[i1list[i]:i2list[i]] = p.get(timeout=30) # timeout allows ctrl-C handling

pool.close()
pool.join()

return zi

def _do_chunk_wrapper(key, i1, i2, k):
"""All arguments are small objects."""
global _mproc_data
bd = _mproc_data['bd'+key]
xi = _mproc_data['xi'+key][i1:i2]
yi = _mproc_data['yi'+key][i1:i2]
return bd.do_chunk(k, xi, yi)


if __name__ == "__main__":
xi, yi = np.linspace(1, 100, 100001), np.linspace(1, 100, 100001)
bd = BigData(int(1e7))
bd.do_all_work(xi, yi, 4)


Here are the results of a speed test (again, 2 cores, 4 threads), varying the number of worker processes and the amount of memory in the chunks (total bytes of the
xi
,
yi
,
zi
array slices). The numbers are in "million result values per second", but that doesn't matter so much for the comparison. The row for "1 process" is a direct call to
do_chunk
with the full input data, without any subprocesses.

#Proc 125K 250K 500K 1000K unlimited
1 0.82
2 4.28 1.96 1.3 1.31
3 2.69 1.06 1.06 1.07
4 2.17 1.27 1.23 1.28


The impact of data size in memory is quite significant. The CPU has 3 MB cache, but I'm not sure how that's spread over the cores and L1/L2/L3 cache. Note that the calculation also needs access to several MB of internal data of the
BigData
object. Hence, what we learn from this is that it is useful to do this kind of speed test. For this program, 2 processes is fastest, followed by 4, and 3 is the slowest.

Answer

Try to reduce interprocess communication. In the multiprocessing module all (single-computer) interprocess communication done through Queues. Objects passed through a Queue are pickled. So try to send fewer and/or smaller objects through the Queue.

  • Do not send self, the instance of BigData, through the Queue. It is rather big, and gets bigger as the amount the amount of data in self grows:

    In [6]: import pickle
    In [14]: len(pickle.dumps(BigData(50)))
    Out[14]: 1052187
    

    Every time pool.apply_async( _do_chunk_wrapper, (self, k, xi, yi)) is called, self is pickled in the main process and unpickled in the worker process. The size of len(pickle.dumps(BigData(N))) grows a N increases.

  • Let the data be read from a global variable. On Linux, you can take advantage of Copy-on-Write. As Jan-Philip Gehrcke explains:

    After fork(), parent and child are in an equivalent state. It would be stupid to copy the entire memory of the parent to another place in the RAM. That's [where] the copy-on-write principle [comes] in. As long as the child does not change its memory state, it actually accesses the parent's memory. Only upon modification, the corresponding bits and pieces are copied into the memory space of the child.

    Thus, you can avoid passing instances of BigData through the Queue by simply defining the instance as a global, bd = BigData(n), (as you are already doing) and referring to its values in the worker processes (e.g. _do_chunk_wrapper). It basically amounts to removing self from the call to pool.apply_async:

    p = pool.apply_async(_do_chunk_wrapper, (k_start, k_end, xi, yi))
    

    and accessing bd as a global, and making the necessary attendant changes to do_chunk_wrapper's call signature.

  • Try to pass longer-running functions, func, to pool.apply_async. If you have many quickly-completing calls to pool.apply_async then the overhead of passing arguments and return values through the Queue becomes a significant part of the overall time. If instead you make fewer calls to pool.apply_async and give each func more work to do before returning a result, then interprocess communication becomes a smaller fraction of the overall time.

    Below, I modified _do_chunk_wrapper to accept k_start and k_end arguments, so that each call to pool.apply_async would compute the sum for many values of k before returning a result.


import math
import numpy as np
import time
import sys
import multiprocessing as mp
import scipy.interpolate as interpolate

_tm=0
def stopwatch(msg=''):
    tm = time.time()
    global _tm
    if _tm==0: _tm = tm; return
    print("%s: %.2f seconds" % (msg, tm-_tm))
    _tm = tm

class BigData:
    def __init__(self, n):
        z = np.random.uniform(size=n*n*n).reshape((n,n,n))
        self.ff = []
        for i in range(n):
            f = interpolate.RectBivariateSpline(
                np.arange(n), np.arange(n), z[i], kx=1, ky=1)
            self.ff.append(f)
        self.n = n

    def do_chunk(self, k, xi, yi):
        n = self.n
        s = np.sum(np.exp(self.ff[k].ev(xi, yi)))
        sys.stderr.write(".")
        return s

    def do_chunk_of_chunks(self, k_start, k_end, xi, yi):
        s = sum(np.sum(np.exp(self.ff[k].ev(xi, yi)))
                    for k in range(k_start, k_end))
        sys.stderr.write(".")
        return s

    def do_multi(self, numproc, xi, yi):
        procs = []
        pool = mp.Pool(numproc)
        stopwatch('\nPool setup')
        ks = list(map(int, np.linspace(0, self.n, numproc+1)))
        for i in range(len(ks)-1):
            k_start, k_end = ks[i:i+2]
            p = pool.apply_async(_do_chunk_wrapper, (k_start, k_end, xi, yi))
            procs.append(p)
        stopwatch('Jobs queued (%d processes)' % numproc)
        total = 0.0
        for k, p in enumerate(procs):
            total += np.sum(p.get(timeout=30)) # timeout allows ctrl-C interrupt
            if k == 0: stopwatch("\nFirst get() done")
        print(total)
        stopwatch('Jobs done')
        pool.close()
        pool.join()
        return total

    def do_single(self, xi, yi):
        total = 0.0
        for k in range(self.n):
            total += self.do_chunk(k, xi, yi)
        stopwatch('\nAll in single process')
        return total

def _do_chunk_wrapper(k_start, k_end, xi, yi): 
    return bd.do_chunk_of_chunks(k_start, k_end, xi, yi)        

if __name__ == "__main__":
    stopwatch()
    n = 50
    bd = BigData(n)
    m = 1000*1000
    xi, yi = np.random.uniform(0, n, size=m*2).reshape((2,m))
    stopwatch('Initialized')
    bd.do_multi(2, xi, yi)
    bd.do_multi(3, xi, yi)
    bd.do_single(xi, yi)

yields

Initialized: 0.15 seconds

Pool setup: 0.06 seconds
Jobs queued (2 processes): 0.00 seconds

First get() done: 6.56 seconds
83963796.0404
Jobs done: 0.55 seconds
..
Pool setup: 0.08 seconds
Jobs queued (3 processes): 0.00 seconds

First get() done: 5.19 seconds
83963796.0404
Jobs done: 1.57 seconds
...
All in single process: 12.13 seconds

compared to the original code:

Initialized: 0.10 seconds
Pool setup: 0.03 seconds
Jobs queued (2 processes): 0.00 seconds

First get() done: 10.47 seconds
Jobs done: 0.00 seconds
..................................................
Pool setup: 0.12 seconds
Jobs queued (3 processes): 0.00 seconds

First get() done: 9.21 seconds
Jobs done: 0.00 seconds
..................................................
All in single process: 12.12 seconds
Comments