N. McA. N. McA. - 4 months ago 11
Python Question

Fast Queue of read only numpy arrays

I have a multiprocessing job where I'm queuing read only numpy arrays, as part of a producer consumer pipeline.

Currently they're being pickled, because this is the default behaviour of

multiprocessing.Queue
which slows down performance.

Is there any pythonic way to pass references to shared memory instead of pickling the arrays?

Unfortunately the arrays are being generated after the consumer is started, and there is no easy way around that. (So the global variable approach would be ugly...).

[Note that in the following code we are not expecting h(x0) and h(x1) to be computed in parallel. Instead we see h(x0) and g(h(x1)) computed in parallel (like a pipelining in a CPU).]

from multiprocessing import Process, Queue
import numpy as np

class __EndToken(object):
pass

def parrallel_pipeline(buffer_size=50):
def parrallel_pipeline_with_args(f):
def consumer(xs, q):
for x in xs:
q.put(x)
q.put(__EndToken())

def parallel_generator(f_xs):
q = Queue(buffer_size)
consumer_process = Process(target=consumer,args=(f_xs,q,))
consumer_process.start()
while True:
x = q.get()
if isinstance(x, __EndToken):
break
yield x

def f_wrapper(xs):
return parallel_generator(f(xs))

return f_wrapper
return parrallel_pipeline_with_args


@parrallel_pipeline(3)
def f(xs):
for x in xs:
yield x + 1.0

@parrallel_pipeline(3)
def g(xs):
for x in xs:
yield x * 3

@parrallel_pipeline(3)
def h(xs):
for x in xs:
yield x * x

def xs():
for i in range(1000):
yield np.random.uniform(0,1,(500,2000))

if __name__ == "__main__":
rs = f(g(h(xs())))
for r in rs:
print r

Answer

Sharing memory between threads or processes

Use threading instead of multiprocessing

Since you're using numpy, you can take advantage of the fact that the global interpreter lock is released during numpy computations. This means you can do parallel processing with standard threads and shared memory, instead of multiprocessing and inter-process communication. Here's a version of your code, tweaked to use threading.Thread and Queue.Queue instead of multiprocessing.Process and multiprocessing.Queue. This passes a numpy ndarray via a queue without pickling it. On my computer, this runs about 3 times faster than your code. (However, it's only about 20% faster than the serial version of your code. I have suggested some other approaches further down.)

from threading import Thread
from Queue import Queue
import numpy as np

class __EndToken(object):
    pass

def parallel_pipeline(buffer_size=50):
    def parallel_pipeline_with_args(f):
        def consumer(xs, q):
            for x in xs:
                q.put(x)
            q.put(__EndToken())

        def parallel_generator(f_xs):
            q = Queue(buffer_size)
            consumer_process = Thread(target=consumer,args=(f_xs,q,))
            consumer_process.start()
            while True:
                x = q.get()
                if isinstance(x, __EndToken):
                    break
                yield x

        def f_wrapper(xs):
            return parallel_generator(f(xs))

        return f_wrapper
    return parallel_pipeline_with_args

@parallel_pipeline(3)
def f(xs):
    for x in xs:
        yield x + 1.0

@parallel_pipeline(3)
def g(xs):
    for x in xs:
        yield x * 3

@parallel_pipeline(3)
def h(xs):
    for x in xs:
        yield x * x

def xs():
    for i in range(1000):
        yield np.random.uniform(0,1,(500,2000))

rs = f(g(h(xs())))
%time print sum(r.sum() for r in rs)  # 12.2s

Store numpy arrays in shared memory

Another option, closer to what you requested, would be to continue using the multiprocessing package, but pass data between processes using arrays stored in shared memory. The code below can do that. Before spawning subprocesses, it creates a pool of numpy arrays backed by shared memory. The workers retrieve a free array, copy their results into it, then put the id of the array (not the array itself) onto the queue. This is much faster than pushing the whole array onto the queue, since it avoids pickling the arrays. This has similar performance to the threaded version above (about 10% slower), and may scale better if the global interpreter lock is an issue (i.e., you run a lot of python code in the functions). However, this is more complex than your version or the threaded version shown above.

from multiprocessing import Process, Queue, Array
import numpy as np

# how many shared-memory arrays will be needed?
n_arrays = 12

# find the size and data type for the arrays
# note: every @parallel_pipeline function must accept and yield arrays of this size
template = np.random.uniform(0,1,(500,2000))
dtype = template.dtype
shape = template.shape
byte_count = len(template.data)
del template

# make a pool of numpy arrays, each backed by shared memory, 
# and a queue to keep track of which ones are free
array_pool = [None] * n_arrays
avail_arrays = Queue(n_arrays)
for i in range(n_arrays):
    buf = Array('c', byte_count, lock=False)
    array_pool[i] = np.frombuffer(buf, dtype=dtype).reshape(shape)
    avail_arrays.put(i)

class __EndToken(object):
    pass

# note: the function result is copied into shared memory in consumer()
# and copied out of shared memory in parallel_generator(). 
# The consumer() copy could be avoided if f(x) directly assigned the 
# computation result to a shared-memory array (using code like consumer()).
# The parallel_generator() copy could be avoided if the user of the value
# always put the array id back into the avail_arrays queue.

def parallel_pipeline(buffer_size=50):
    def parallel_pipeline_with_args(f):
        def consumer(xs, q):
            for x in xs:
                # get the ID of an available shared-memory array
                id = avail_arrays.get()
                # copy x to the shared-memory array
                array_pool[id][:] = x
                # put the array's id (not the whole array) onto the queue
                q.put(id)
            q.put(__EndToken())

        def parallel_generator(f_xs):
            q = Queue(buffer_size)
            consumer_process = Process(target=consumer,args=(f_xs,q,))
            consumer_process.start()
            while True:
                # get the id of the array holding the next value
                id = q.get()
                if isinstance(id, __EndToken):
                    break
                # copy the array 
                x = array_pool[id].copy()
                # put the shared-memory array back into the pool
                avail_arrays.put(id)
                yield x

        def f_wrapper(xs):
            return parallel_generator(f(xs))

        return f_wrapper
    return parallel_pipeline_with_args


@parallel_pipeline(3)
def f(xs):
    for x in xs:
        yield x + 1.0

@parallel_pipeline(3)
def g(xs):
    for x in xs:
        yield x * 3

@parallel_pipeline(3)
def s(xs):
    for x in xs:
        yield x.sum()

def xs():
    for i in range(1000):
        yield np.random.uniform(0,1,(500,2000))

print "multiprocessing with shared-memory arrays:"
%time print sum(r.sum() for r in f(g(h(xs()))))   # 14.0s

Parallel processing of samples instead of functions

The code above is only about 20% faster than a single-threaded version (12.2s vs. 14.8s for the serial version shown below). That is because each function is run in a single thread or process, and most of the work is done by xs(). The execution time for the example above is nearly the same as if you just ran %time print sum(1 for x in xs()).

If your real project has many more intermediate functions and/or they are more complex than the ones you showed, then the workload may be distributed better among processors, and this may not be a problem. However, if your workload really does resemble the code you provided, then you may want to refactor your code to allocate one sample to each thread instead of one function to each thread. That would look like the code below (both threading and multiprocessing versions are shown):

import multiprocessing
import threading, Queue
import numpy as np

def f(x):
    return x + 1.0

def g(x):
    return x * 3

def h(x):
    return x * x

def final(i):
    return f(g(h(x(i))))

def final_sum(i):
    return f(g(h(x(i)))).sum()

def x(i):
    # produce sample number i
    return np.random.uniform(0, 1, (500, 2000))

def rs_serial(func, n):
    for i in range(n):
        yield func(i)

def rs_parallel_threaded(func, n):
    todo = range(n)
    q = Queue.Queue(2*n_workers)

    def worker():
        while True:
            try:
                # the global interpreter lock ensures only one thread does this at a time
                i = todo.pop()
                q.put(func(i))
            except IndexError:
                # none left to do
                q.put(None)
                break

    threads = []
    for j in range(n_workers):
        t = threading.Thread(target=worker)
        t.daemon=False
        threads.append(t)   # in case it's needed later
        t.start()

    while True:
        x = q.get()
        if x is None:
            break
        else:
            yield x

def rs_parallel_mp(func, n):
    pool = multiprocessing.Pool(n_workers)
    return pool.imap_unordered(func, range(n))

n_workers = 4
n_samples = 1000

print "serial:"  # 14.8s
%time print sum(r.sum() for r in rs_serial(final, n_samples))
print "threaded:"  # 10.1s
%time print sum(r.sum() for r in rs_parallel_threaded(final, n_samples))

print "mp return arrays:"  # 19.6s
%time print sum(r.sum() for r in rs_parallel_mp(final, n_samples))
print "mp return results:"  # 8.4s
%time print sum(r_sum for r_sum in rs_parallel_mp(final_sum, n_samples))

The threaded version of this code is only slightly faster than the first example I gave, and only about 30% faster than the serial version. That's not as much of a speedup as I would have expected; maybe Python is still getting partly bogged down by the GIL?

The multiprocessing version performs significantly faster than your original multiprocessing code, primarily because all the functions get chained together in a single process, rather than queueing (and pickling) intermediate results. However, it is still slower than the serial version because all the result arrays have to get pickled (in the worker process) and unpickled (in the main process) before being returned by imap_unordered. However, if you can arrange it so that your pipeline returns aggregate results instead of the complete arrays, then you can avoid the pickling overhead, and the multiprocessing version is fastest: about 43% faster than the serial version.

OK, now for the sake of completeness, here's a version of the second example that uses multiprocessing with your original generator functions instead of the finer-scale functions shown above. This uses some tricks to spread the samples among multiple processes, which may make it unsuitable for many workflows. But using generators does seem to be slightly faster than using the finer-scale functions, and this method can get you up to a 54% speedup vs. the serial version shown above. However, that is only available if you don't need to return the full arrays from the worker functions.

import multiprocessing, itertools, math
import numpy as np

def f(xs):
    for x in xs:
        yield x + 1.0

def g(xs):
    for x in xs:
        yield x * 3

def h(xs):
    for x in xs:
        yield x * x

def xs():
    for i in range(1000):
        yield np.random.uniform(0,1,(500,2000))

def final():
    return f(g(h(xs())))

def final_sum():
    for x in f(g(h(xs()))):
        yield x.sum()

def get_chunk(args):
    """Retrieve n values (n=args[1]) from a generator function (f=args[0]) and return them as a list. 
    This runs in a worker process and does all the computation."""
    return list(itertools.islice(args[0](), args[1]))

def parallelize(gen_func, max_items, n_workers=4, chunk_size=50):
    """Pull up to max_items items from several copies of gen_func, in small groups in parallel processes.
    chunk_size should be big enough to improve efficiency (one copy of gen_func will be run for each chunk)
    but small enough to avoid exhausting memory (each worker will keep chunk_size items in memory)."""

    pool = multiprocessing.Pool(n_workers)

    # how many chunks will be needed to yield at least max_items items?
    n_chunks = int(math.ceil(float(max_items)/float(chunk_size)))

    # generate a suitable series of arguments for get_chunk()
    args_list = itertools.repeat((gen_func, chunk_size), n_chunks)

    # chunk_gen will yield a series of chunks (lists of results) from the generator function, 
    # totaling n_chunks * chunk_size items (which is >= max_items)
    chunk_gen = pool.imap_unordered(get_chunk, args_list)

    # parallel_gen flattens the chunks, and yields individual items
    parallel_gen = itertools.chain.from_iterable(chunk_gen)

    # limit the output to max_items items 
    return itertools.islice(parallel_gen, max_items)


# in this case, the parallel version is slower than a single process, probably
# due to overhead of gathering numpy arrays in imap_unordered (via pickle?)
print "serial, return arrays:"  # 15.3s
%time print sum(r.sum() for r in final())
print "parallel, return arrays:"  # 24.2s
%time print sum(r.sum() for r in parallelize(final, max_items=1000))


# in this case, the parallel version is more than twice as fast as the single-thread version
print "serial, return result:"  # 15.1s
%time print sum(r for r in final_sum())
print "parallel, return result:"  # 6.8s
%time print sum(r for r in parallelize(final_sum, max_items=1000))