demon.mhm demon.mhm - 1 month ago 23
Python Question

how to complex manage shell processes with asyncio?

I want to track reboot process of daemon with python's asyncio module. So I need to run shell command

tail -f -n 0 /var/log/daemon.log
and analyze it's output while, let's say,
service daemon restart
executing in background. Daemon continues to write to log after service restart command finished it's execution and reports it's internal checks. Track process read checks info and reports that reboot was successful or not based it's internal logic.

import asyncio
from asyncio.subprocess import PIPE, STDOUT

async def track():
output = []
process = await asyncio.create_subprocess_shell(
'tail -f -n0 ~/daemon.log',
stdin=PIPE, stdout=PIPE, stderr=STDOUT
)
while True:
line = await process.stdout.readline()
if line.decode() == 'reboot starts\n':
output.append(line)
break
while True:
line = await process.stdout.readline()
if line.decode() == '1st check completed\n':
output.append(line)
break
return output

async def reboot():
lines = [
'...',
'...',
'reboot starts',
'...',
'1st check completed',
'...',
]
p = await asyncio.create_subprocess_shell(
(
'echo "rebooting"; '
'for line in {}; '
'do echo $line >> ~/daemon.log; sleep 1; '
'done; '
'echo "rebooted";'
).format(' '.join('"{}"'.format(l) for l in lines)),
stdin=PIPE, stdout=PIPE, stderr=STDOUT
)
return (await p.communicate())[0].splitlines()

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
asyncio.ensure_future(track()),
asyncio.ensure_future(reboot())
))
loop.close()


This code is only way I've found to run two coroutines in parallel. But how to run
track()
strictly before
reboot
to not miss any possible output in log? And how to retrieve return values of both coroutines?

Answer

But how to run track() strictly before reboot to not miss any possible output in log?

You could await the first subprocess creation before running the second one.

And how to retrieve return values of both coroutines?

asyncio.gather returns the aggregated results.

Example:

async def main():
    process_a = await asyncio.create_subprocess_shell([...])
    process_b = await asyncio.create_subprocess_shell([...])
    return await asyncio.gather(monitor_a(process_a), monitor_b(process_b))

loop = asyncio.get_event_loop()
result_a, result_b = loop.run_until_complete(main())