mepmerp mepmerp -4 years ago 120
Python Question

Python multiprocessing subprocesses with ordered printing?

I'm trying to run some Python functions in parallel, which has print commands throughout the function. What I want is to have each subprocess running the same function, to output to the main stdout in a grouped manner. What I mean by that is that I want each subprocess's output to only be printed after it has finished completing its task. If, however, some kind of error occured during this process, I want to still output whatever was done in the subprocess.

A small example:

from time import sleep
import multiprocessing as mp


def foo(x):
print('foo')
for i in range(5):
print('Process {}: in foo {}'.format(x, i))
sleep(0.5)


if __name__ == '__main__':
pool = mp.Pool()

jobs = []
for i in range(4):
job = pool.apply_async(foo, args=[i])
jobs.append(job)

for job in jobs:
job.wait()


This runs in parallel, but what is outputted is:

foo
Process 0: in foo 0
foo
Process 1: in foo 0
foo
Process 2: in foo 0
foo
Process 3: in foo 0
Process 1: in foo 1
Process 0: in foo 1
Process 2: in foo 1
Process 3: in foo 1
Process 1: in foo 2
Process 0: in foo 2
Process 2: in foo 2
Process 3: in foo 2
Process 1: in foo 3
Process 0: in foo 3
Process 3: in foo 3
Process 2: in foo 3
Process 1: in foo 4
Process 0: in foo 4
Process 3: in foo 4
Process 2: in foo 4


What I want is:

foo
Process 3: in foo 0
Process 3: in foo 1
Process 3: in foo 2
Process 3: in foo 3
Process 3: in foo 4
foo
Process 1: in foo 0
Process 1: in foo 1
Process 1: in foo 2
Process 1: in foo 3
Process 1: in foo 4
foo
Process 0: in foo 0
Process 0: in foo 1
Process 0: in foo 2
Process 0: in foo 3
Process 0: in foo 4
foo
Process 2: in foo 0
Process 2: in foo 1
Process 2: in foo 2
Process 2: in foo 3
Process 2: in foo 4


It doesn't matter the particular order of either process, as long as each output is grouped together for each subprocess. Interestingly enough, I get my desired output if I do

python test.py > output


I know that each subprocess do not get their own stdout, instead they use the main stdout. I've thought and looked up some solutions to this, such as making it so that we use a Queue, and each subprocess has its own stdout, and then when it's done, we override the flush command so that we can output the output back to the Queue. After that, we can read the contents. However, although this does satisfy what I want, I cannot retrieve the output if the function stopped halfway through. It will only output when it has successfully completed. Got it from here Access standard output of a sub process in python

I've also seen the usage of locks, which works, but it completely kills running the function in parallel, since it'd have to wait for each subprocess to function executing the function foo.

Also, if possible, I'd like to avoid changing the implementation of my foo function, as I have many functions that I would need to change.

EDIT: I have looked into the libraries dispy and parallel python. Dispy does exactly what I want, where it has a separate stdout/stderr that I can just print out at the end, but the problem with dispy is that I have to manually run the server in a separate terminal. I want to be able to run my python program all in one go without having to first open another script. Parallel Python on the other hand, does what I want as well, but it seems to be lacking in the control you have over it, as well as some annoying nuisances to it. In particular, when you print out the output, it also prints out the return type of the function, I just want the output that I printed out using print. Also, when running a function, you have to give it a list of modules that it uses, this is slightly annoying, since I do not want to have to have a big list of imports just to run a simple function.

Answer Source

As you've noticed, using a lock in this case would kill multiprocessing because you'd essentially have all the processes wait for a mutex release from the process who currently holds the 'rights' to STDOUT. However, running in parallel and printing in sync with your function/subprocess is logically exclusive.

What you can do instead is to have your main process serve as a 'printer' for your subprocesses in such a way that once your subprocess finishes/errors, then and only then it sends back to your main process what to print. You seem to be perfectly content for printing not to be 'real time' (nor it could, anyway, as previously mentioned) so that approach should serve you just right. So:

import multiprocessing as mp
import random  # just to add some randomness
from time import sleep

def foo(x):
    output = ["[Process {}]: foo:".format(x)]
    for i in range(5):
        output.append('[Process {}] in foo {}'.format(x, i))
        sleep(0.2 + 1 * random.random())
    return "\n".join(output)

if __name__ == '__main__':
    pool = mp.Pool(4)
    for res in pool.imap_unordered(foo, range(4)):
        print("[MAIN]: Process finished, response:")
        print(res)  # this will print as soon as one of the processes finishes/errors
    pool.close()

Which will give you (YMMV, of course):

[MAIN]: Process finished, response:
[Process 2]: foo:
[Process 2] in foo 0
[Process 2] in foo 1
[Process 2] in foo 2
[Process 2] in foo 3
[Process 2] in foo 4
[MAIN]: Process finished, response:
[Process 0]: foo:
[Process 0] in foo 0
[Process 0] in foo 1
[Process 0] in foo 2
[Process 0] in foo 3
[Process 0] in foo 4
[MAIN]: Process finished, response:
[Process 1]: foo:
[Process 1] in foo 0
[Process 1] in foo 1
[Process 1] in foo 2
[Process 1] in foo 3
[Process 1] in foo 4
[MAIN]: Process finished, response:
[Process 3]: foo:
[Process 3] in foo 0
[Process 3] in foo 1
[Process 3] in foo 2
[Process 3] in foo 3
[Process 3] in foo 4

You can observe anything else, including errors the same way.

UPDATE - If you absolutely have to use functions whose output you cannot control, you can wrap your subprocesses and capture their STDOUT/STDERR instead, and then once they are done (or raise an exception) you can return everything back to the process 'manager' for printing to the actual STDOUT. With such setup, we can have foo() like:

def foo(x):
    print("[Process {}]: foo:".format(x))
    for i in range(5):
        print('[Process {}] in foo {}'.format(x, i))
        sleep(0.2 + 1 * random.random())
        if random.random() < 0.0625:  # let's add a 1/4 chance to err:
            raise Exception("[Process {}] A random exception is random!".format(x))
    return random.random() * 100  # just a random response, you can omit it

Notice that it's blissfully unaware of something trying to mess with its mode of operation. We'll then create an external general purpose wrapper (so you don't have to change it in dependence of functions) to actually mess with its default behavior (and not just this functions, but with everything else it might call while running):

def std_wrapper(args):
    try:
        from StringIO import StringIO  # ... for Python 2.x compatibility
    except ImportError:
        from io import StringIO
    import sys
    sys.stdout, sys.stderr = StringIO(), StringIO()  # replace stdout/err with our buffers
    # args is a list packed as: [0] process function name; [1] args; [2] kwargs; lets unpack:
    process_name = args[0]
    process_args = args[1] if len(args) > 1 else []
    process_kwargs = args[2] if len(args) > 2 else {}
    # get our method from its name, assuming global namespace of the current module/script
    process = globals()[process_name]
    response = None  # in case a call fails
    try:
        response = process(*process_args, **process_kwargs)  # call our process function
    except Exception as e:  # too broad but good enough as an example
        print(e)
    # rewind our buffers:
    sys.stdout.seek(0)
    sys.stderr.seek(0)
    # return everything packed as STDOUT, STDERR, PROCESS_RESPONSE | NONE
    return sys.stdout.read(), sys.stderr.read(), response

Now all we need is to call this wrapper instead of the desired foo(), and provide it with information on what to call on our behalf:

if __name__ == '__main__':
    pool = mp.Pool(4)
    # since we're wrapping the process we're calling, we need to send to the wrapper packed
    # data with instructions on what to call on our behalf.
    # info on args packing available in the std_wrapper function above.
    for out, err, res in pool.imap_unordered(std_wrapper, [("foo", [i]) for i in range(4)]):
        print("[MAIN]: Process finished, response: {}, STDOUT:".format(res))
        print(out.rstrip())  # remove the trailing space for niceness, print err if you want
    pool.close()

So now if you run it, you'll get something like this:

[MAIN]: Process finished, response: None, STDOUT:
[Process 2]: foo:
[Process 2] in foo 0
[Process 2] in foo 1
[Process 2] A random exception is random!
[MAIN]: Process finished, response: 87.9658471743586, STDOUT:
[Process 1]: foo:
[Process 1] in foo 0
[Process 1] in foo 1
[Process 1] in foo 2
[Process 1] in foo 3
[Process 1] in foo 4
[MAIN]: Process finished, response: 38.929554421661194, STDOUT:
[Process 3]: foo:
[Process 3] in foo 0
[Process 3] in foo 1
[Process 3] in foo 2
[Process 3] in foo 3
[Process 3] in foo 4
[MAIN]: Process finished, response: None, STDOUT:
[Process 0]: foo:
[Process 0] in foo 0
[Process 0] in foo 1
[Process 0] in foo 2
[Process 0] in foo 3
[Process 0] in foo 4
[Process 0] A random exception is random!

Despite foo() just printing away or erroring. Of course, you can use such wrapper to call any function and pass any number of args/kwargs to it.

UPDATE #2 - But wait! If we can wrap our function processes like this, and have their STDOUT/STDERR captured, we surely can turn this into a decorator and use it in our code with a simple decoration. So, for my final proposal:

import functools
import multiprocessing
import random  # just to add some randomness
import time

def std_wrapper(func):
    @functools.wraps(func)  # we need this to unravel the target function name
    def caller(*args, **kwargs):  # and now for the wrapper, nothing new here
        try:
            from StringIO import StringIO  # ... for Python 2.x compatibility
        except ImportError:
            from io import StringIO
        import sys
        sys.stdout, sys.stderr = StringIO(), StringIO()  # use our buffers instead
        response = None  # in case a call fails
        try:
            response = func(*args, **kwargs)  # call our wrapped process function
        except Exception as e:  # too broad but good enough as an example
            print(e)  # NOTE: the exception is also printed to the captured STDOUT
        # rewind our buffers:
        sys.stdout.seek(0)
        sys.stderr.seek(0)
        # return everything packed as STDOUT, STDERR, PROCESS_RESPONSE | NONE
        return sys.stdout.read(), sys.stderr.read(), response
    return caller

@std_wrapper  # decorate any function, it won't know you're siphoning its STDOUT/STDERR
def foo(x):
    print("[Process {}]: foo:".format(x))
    for i in range(5):
        print('[Process {}] in foo {}'.format(x, i))
        time.sleep(0.2 + 1 * random.random())
        if random.random() < 0.0625:  # let's add a 1/4 chance to err:
            raise Exception("[Process {}] A random exception is random!".format(x))
    return random.random() * 100  # just a random response, you can omit it

And now we can call our wrapped functions as before without dealing with arguments packing or anything of the sort, so we're back at:

if __name__ == '__main__':
    pool = multiprocessing.Pool(4)
    for out, err, res in pool.imap_unordered(foo, range(4)):
        print("[MAIN]: Process finished, response: {}, STDOUT:".format(res))
        print(out.rstrip())  # remove the trailing space for niceness, print err if you want
    pool.close()

The output is the same as in the previous example, but in a much nicer and manageable package.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download