Daniel Standage Daniel Standage - 5 months ago 20
Python Question

Streaming wrapper around program that writes to multiple output files

There is a program (which I cannot modify) that creates two output files. I am trying to write a Python wrapper that invokes this program, reads both output streams simultaneously, combines the output, and prints to stdout (to facilitate streaming). How can I do this without deadlocking? The following proof of concept below works fine, but when I apply this approach to the actual program it deadlocks.




Proof of concept: this is a dummy program,
bogus.py
, that creates two output files like the program I'm trying to wrap.

#!/usr/bin/env python
from __future__ import print_function
import sys
with open(sys.argv[1], 'w') as f1, open(sys.argv[2], 'w') as f2:
for i in range(1000):
if i % 2 == 0:
print(i, file=f1)
else:
print(i, file=f2)


And here is the Python wrapper that invokes the program and combines its two outputs (interleaving 4 lines from each at a time).

#!/usr/bin/env python
from __future__ import print_function
from contextlib import contextmanager
import os
import shutil
import subprocess
import tempfile

@contextmanager
def named_pipe():
"""
Create a temporary named pipe.

Stolen shamelessly from StackOverflow:
http://stackoverflow.com/a/28840955/459780
"""
dirname = tempfile.mkdtemp()
try:
path = os.path.join(dirname, 'named_pipe')
os.mkfifo(path)
yield path
finally:
shutil.rmtree(dirname)

with named_pipe() as f1, named_pipe() as f2:
cmd = ['./bogus.py', f1, f2]
child = subprocess.Popen(cmd)
with open(f1, 'r') as in1, open(f2, 'r') as in2:
buff = list()
for i, lines in enumerate(zip(in1, in2)):
line1 = lines[0].strip()
line2 = lines[1].strip()
print(line1)
buff.append(line2)
if len(buff) == 4:
for line in buff:
print(line)

Answer

I'm seeing big chunks of one file and then big chunks of the other file, regardless of whether I write to stdout, stderr, or tty.

If you can't make the child to use line-buffering for files then a simple solution to read complete interleaved lines from the output files while the process is still running as soon as the output becomes available is to use threads:

#!/usr/bin/env python2
from subprocess import Popen
from threading import Thread
from Queue import Queue

def readlines(path, queue):
    try:
        with open(path) as file:
            for line in file:
                queue.put(line)
    finally:
        queue.put(None)

with named_pipes(n=2) as paths:
    child = Popen(['python', 'child.py'] + paths)
    queue = Queue()
    for path in paths:
        Thread(target=readlines, args=[path, queue]).start()
    for _ in paths:
        for line in iter(queue.get, None):
            print line.rstrip('\n')

where named_pipes(n) is defined here.

pipe.readline() is broken for a non-blocking pipe on Python 2 that is why threads are used here.


To print a line from one file followed by a line from another:

with named_pipes(n=2) as paths:
    child = Popen(['python', 'child.py'] + paths)
    queues = [Queue() for _ in paths]
    for path, queue in zip(paths, queues):
        Thread(target=readlines, args=[path, queue]).start()
    while queues:
        for q in queues:
            line = q.get()
            if line is None:  # EOF
                queues.remove(q)
            else:
                print line.rstrip('\n')

If child.py writes more lines to one file than another file then the difference is kept in memory and therefore individual queues in queues may grow unlimited until they fill all the memory. You can set the max number of items in a queue but then you have to pass a timeout to q.get() otherwise the code may deadlock.


If you need to print exactly 4 lines from one output file then exactly 4 lines from another output file, etc then you could slightly modify the given code example:

with named_pipes(n=2) as paths:
    child = Popen(['python', 'child.py'] + paths)
    queues = [Queue() for _ in paths]
    for path, queue in zip(paths, queues):
        Thread(target=readlines, args=[path, queue]).start()
    while queues:
        # print 4 lines from one queue followed by 4 lines from another queue
        for q in queues:
            for _ in range(4):
                line = q.get()
                if line is None:  # EOF
                    queues.remove(q)
                    break
                else:
                    print line.rstrip('\n')

It won't deadlock but it may eat all memory if your child process write too much data into one file without writing enough into another file (only the difference is kept in memory—if the file are relatively equal; the program supports arbitrary large output files).