Nar┼źnasK Nar┼źnasK - 1 month ago 15
Python Question

asyncio terminate subprocess on timeout

I have a script which must execute some shell commands. However if command takes too long to complete it must be forcibly killed. Consider the following code snippet:

import asyncio, random

q = asyncio.Queue()

MAX_WAIT = 5

@asyncio.coroutine
def blocking_task(sec):
print('This task will sleep {} sec.'.format(sec))
create = asyncio.create_subprocess_shell(
'sleep {s}; echo "Woke up after {s} sec." >> ./tst.log'.format(s=sec),
stdout=asyncio.subprocess.PIPE)
proc = yield from create
yield from proc.wait()

@asyncio.coroutine
def produce():
while True:
q.put_nowait(random.randint(3,8))
yield from asyncio.sleep(0.5 + random.random())

@asyncio.coroutine
def consume():
while True:
value = yield from q.get()
try:
yield from asyncio.wait_for(blocking_task(value), MAX_WAIT)
except asyncio.TimeoutError:
print('~/~ Job has been cancelled !!')
else:
print('=/= Job has been done :]')


loop = asyncio.get_event_loop()
asyncio.ensure_future(produce())
asyncio.ensure_future(consume())
loop.run_forever()


This code produce the following output:

This task will sleep 4 sec.
=/= Job has been done :]
This task will sleep 8 sec.
~/~ Job has been cancelled !!
This task will sleep 5 sec.
~/~ Job has been cancelled !!


So it seems that it's working as expected, jobs are stopped if they take too long to finish. But if I check the log I can confirm that however time consuming tasks continued running and were not actually stopped / killed / aborted:

Woke up after 4 sec.
Woke up after 8 sec.
Woke up after 5 sec.


I would expect there should be just one line in the log, as other processes must have been aborted before they had a chance to finish:

Woke up after 4 sec.


Is there a way to achieve what I want?

I'm not even sure if I need
asyncio
here, perhaps
concurrent.futures
could be used too. Either way task is the same - terminate tasks, which are taking too much time to finish.

Answer

You could use Process.terminate:

try:
    yield from proc.wait()
except asyncio.CancelledError:
    proc.terminate()