Feuermurmel Feuermurmel - 7 months ago 11
Python Question

Connect two processes started with asyncio.subprocess.create_subprocess_exec()

When starting two processes with the old school

subprocess.Popen()
API, I can easily connect standard out of one process to standard in of another process, creating a pipeline in the same way as a UNIX shell will do when connecting commands with
|
:

from subprocess import Popen, PIPE

process_1 = Popen(['ls'], stdout = PIPE)
process_2 = Popen(['wc'], stdin = process_1.stdout)

process_1.wait()
process_2.wait()


How can I accomplish the same when using the asynchronous API from
asyncio.subprocess.create_subprocess_exec()
(or similar)? This is what I tried:

from asyncio.events import get_event_loop
from asyncio.subprocess import PIPE, create_subprocess_exec

async def main():
process_1 = await create_subprocess_exec('ls', stdout = PIPE)
process_2 = await create_subprocess_exec('wc', stdin = process_1.stdout)

await process_1.wait()
await process_2.wait()

get_event_loop().run_until_complete(main())


But the second call to
create_subprocess_exec()
complains that the argument passed to
stdin
has no
fileno
(which is true):

Traceback (most recent call last):
File ".../test-async.py", line 11, in <module>
get_event_loop().run_until_complete(main())
[...]
File ".../test-async.py", line 6, in main
process_2 = await create_subprocess_exec('wc', stdin = process_1.stdout)
[...]
File "/opt/local/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/subprocess.py", line 1388, in _get_handles
p2cread = stdin.fileno()
AttributeError: 'StreamReader' object has no attribute 'fileno'


How can I get the same result as in the synchronous example above?

Answer

In asyncio, process.stdout is actually a StreamReader, not a file object. The file object can be accessed through process._transport._proc.stdout. Unfortunately, you won't be able to use it since it has already been registered in the event loop in order to provide the stream interface process.stdout.

One way to deal with the issue is to create your own pipe and pass the file descriptors to the subprocess:

async def main():
    read, write = os.pipe()
    process_1 = await create_subprocess_exec('ls', stdout=write)
    os.close(write)
    process_2 = await create_subprocess_exec('wc', stdin=read, stdout=PIPE)
    return await process_2.stdout.read()

Note that you have to close the write file explicitly once the subprocess is started (it is closed automatically only if you use subprocess.PIPE).

Comments