John John - 6 months ago 16
Python Question

How to process sub-batches of asynchronous jobs?

I have a group of async jobs (roughly 100) that I want to run in batches of five using subprocess.popen for each job. My plan would be:

  1. Execute the first five jobs from the list of jobs

  2. Poll the active jobs each minute or so (each job takes a few minutes to run)

  3. If a job is done, execute the next job, always guaranteeing that we're running five jobs at a time

  4. Continue until we've gone through the entire job list

Is there a known pattern to do this in python?


In Python 2, I've used a combination of multiprocessing.Pool with subprocess for this. But this does have extra overhead in the form of the processes for the pool.

So in Python 3 I use a concurrent.futures.ThreadPoolExecutor instead of a multiprocessing.pool;

The code fragment below shows how to use the ThreadPoolExecutor;

import concurrent.futures as cf
import logging
import os

errmsg = 'conversion of track {} failed, return code {}'
okmsg = 'finished track {}, "{}"'
num = len(data['tracks'])
with cf.ThreadPoolExecutor(max_workers=os.cpu_count()) as tp:
    fl = [tp.submit(runflac, t, data) for t in range(num)]
    for fut in cf.as_completed(fl):
        idx, rv = fut.result()
        if rv == 0:
  , data['tracks'][idx]))
            logging.error(errmsg.format(idx+1, rv))

The runflac function uses subprocess to call flac(1) to convert music files:

import subprocess

def runflac(idx, data):
    """Use the flac(1) program to convert a music file to FLAC format.

        idx: track index (starts from 0)
        data: album data dictionary

        A tuple containing the track index and return value of flac.
    num = idx + 1
    ifn = 'track{:02d}.cdda.wav'.format(num)
    args = ['flac', '--best', '--totally-silent',
            '-TARTIST=' + data['artist'], '-TALBUM=' + data['title'],
            '-TTITLE=' + data['tracks'][idx],
            '-TTRACKNUM={:02d}'.format(num), '-o',
            'track{:02d}.flac'.format(num), ifn]
    rv =, stdout=subprocess.DEVNULL,
    return (idx, rv)


In Python 2.7, there is another technique that is slightly more complicated but avoids the overhead of using a multiprocessing pool.

The basic form is:

starter = functools.partial(startencoder, crf=args.crf, preset=args.preset)
procs = []
maxprocs = cpu_count()
for ifile in args.files:
    while len(procs) == maxprocs:
while len(procs) > 0:

(Using functools.partial is a way to set default arguments for a function. It is not relevant to the principle.) The startencoder function is basically a wrapper around subprocess.Popen, but it returns some extra information except the Popen instance;

def startencoder(fname, crf, preset):
    Use ffmpeg to convert a video file to H.264/AAC streams in an MP4

        fname: Name of the file to convert.
        crf: Constant rate factor. See ffmpeg docs.
        preset: Encoding preset. See ffmpeg docs.

        A 3-tuple of a Process, input path and output path.
    basename, ext = os.path.splitext(fname)
    known = ['.mp4', '.avi', '.wmv', '.flv', '.mpg', '.mpeg', '.mov', '.ogv',
            '.mkv', '.webm']
    if ext.lower() not in known:
        ls = "File {} has unknown extension, ignoring it.".format(fname)
        return (None, fname, None)
    ofn = basename + '.mp4'
    args = ['ffmpeg', '-i', fname, '-c:v', 'libx264', '-crf', str(crf),
            '-preset', preset, '-flags',  '+aic+mv4', '-c:a', 'libfaac',
            '-sn', '-y', ofn]
        p = subprocess.Popen(args, stdout=subprocess.DEVNULL,
                            stderr=subprocess.DEVNULL)"Conversion of {} to {} started.".format(fname, ofn))
        logging.error("Starting conversion of {} failed.".format(fname))
    return (p, fname, ofn)

What is important is the manageprocs function:

def manageprocs(proclist):
    Check a list of subprocesses tuples for processes that have ended and
    remove them from the list.

        proclist: a list of (process, input filename, output filename)
    nr = '# of conversions running: {}\r'.format(len(proclist))
    for p in proclist:
        pr, ifn, ofn = p
        if pr is None:
        elif pr.poll() is not None:
  'Conversion of {} to {} finished.'.format(ifn, ofn))