Tagc Tagc - 2 months ago 10
Python Question

How to wait for coroutines to complete synchronously within method if event loop is already running?

I'm trying to create a Python-based CLI that communicates with a web service via websockets. One issue that I'm encountering is that requests made by the CLI to the web service intermittently fail to get processed. Looking at the logs from the web service, I can see that the problem is caused by the fact that frequently these requests are being made at the same time (or even after) the socket has closed:

2016-09-13 13:28:10,930 [22 ] INFO DeviceBridge - Device bridge has opened
2016-09-13 13:28:11,936 [21 ] DEBUG DeviceBridge - Device bridge has received message
2016-09-13 13:28:11,937 [21 ] DEBUG DeviceBridge - Device bridge has received valid message
2016-09-13 13:28:11,937 [21 ] WARN DeviceBridge - Unable to process request: {"value": false, "path": "testcube.pwms[0].enabled", "op": "replace"}
2016-09-13 13:28:11,936 [5 ] DEBUG DeviceBridge - Device bridge has closed


In my CLI I define a class
CommunicationService
that is responsible for handling all direct communication with the web service. Internally, it uses the
websockets
package to handle communication, which itself is built on top of
asyncio
.

CommunicationService
contains the following method for sending requests:

def send_request(self, request: str) -> None:
logger.debug('Sending request: {}'.format(request))
asyncio.ensure_future(self._ws.send(request))


...where
ws
is a websocket opened earlier in another method:

self._ws = await websockets.connect(websocket_address)


What I want is to be able to await the future returned by
asyncio.ensure_future
and, if necessary, sleep for a short while after in order to give the web service time to process the request before the websocket is closed.

However, since
send_request
is a synchronous method, it can't simply
await
these futures. Making it asynchronous would be pointless as there would be nothing to await the coroutine object it returned. I also can't use
loop.run_until_complete
as the loop is already running by the time it is invoked.

I found someone describing a problem very similar to the one I have at mail.python.org. The solution that was posted in that thread was to make the function return the coroutine object in the case the loop was already running:

def aio_map(coro, iterable, loop=None):
if loop is None:
loop = asyncio.get_event_loop()

coroutines = map(coro, iterable)
coros = asyncio.gather(*coroutines, return_exceptions=True, loop=loop)

if loop.is_running():
return coros
else:
return loop.run_until_complete(coros)


This is not possible for me, as I'm working with PyRx (Python implementation of the reactive framework) and
send_request
is only called as a subscriber of an Rx observable, which means the return value gets discarded and is not available to my code:

class AnonymousObserver(ObserverBase):
...
def _on_next_core(self, value):
self._next(value)


On a side note, I'm not sure if this is some sort of problem with
asyncio
that's commonly come across or whether I'm just not getting it, but I'm finding it pretty frustrating to use. In C# (for instance), all I would need to do is probably something like the following:

void SendRequest(string request)
{
this.ws.Send(request).Wait();
// Task.Delay(500).Wait(); // Uncomment If necessary
}


Meanwhile,
asyncio
's version of "wait" unhelpfully just returns another coroutine that I'm forced to discard.

Update

I've found a way around this issue that seems to work. I have an asynchronous callback that gets executed after the command has executed and before the CLI terminates, so I just changed it from this...

async def after_command():
await comms.stop()


...to this:

async def after_command():
await asyncio.sleep(0.25) # Allow time for communication
await comms.stop()


I'd still be happy to receive any answers to this problem for future reference, though. I might not be able to rely on workarounds like this in other situations, and I still think it would be better practice to have the delay executed inside
send_request
so that clients of
CommunicationService
do not have to concern themselves with timing issues.

In regards to Vincent's question:


Does your loop run in a different thread, or is send_request called by some callback?


Everything runs in the same thread - it's called by a callback. What happens is that I define all my commands to use asynchronous callbacks, and when executed some of them will try to send a request to the web service. Since they're asynchronous, they don't do this until they're executed via a call to
loop.run_until_complete
at the top level of the CLI - which means the loop is running by the time they're mid-way through execution and making this request (via an indirect call to
send_request
).

Update 2

Here's a solution based on Vincent's proposal of adding a "done" callback.

A new boolean field
_busy
is added to
CommunicationService
to represent if comms activity is occurring or not.

CommunicationService.send_request
is modified to set
_busy
true before sending the request, and then provides a callback to
_ws.send
to reset
_busy
once done:

def send_request(self, request: str) -> None:
logger.debug('Sending request: {}'.format(request))

def callback(_):
self._busy = False

self._busy = True
asyncio.ensure_future(self._ws.send(request)).add_done_callback(callback)


CommunicationService.stop
is now implemented to wait for this flag to be set false before progressing:

async def stop(self) -> None:
"""
Terminate communications with TestCube Web Service.
"""
if self._listen_task is None or self._ws is None:
return

# Wait for comms activity to stop.
while self._busy:
await asyncio.sleep(0.1)

# Allow short delay after final request is processed.
await asyncio.sleep(0.1)

self._listen_task.cancel()
await asyncio.wait([self._listen_task, self._ws.close()])

self._listen_task = None
self._ws = None
logger.info('Terminated connection to TestCube Web Service')


This seems to work too, and at least this way all communication timing logic is encapsulated within the
CommunicationService
class as it should be.

Update 3

Nicer solution based on Vincent's proposal.

Instead of
self._busy
we have
self._send_request_tasks = []
.

New
send_request
implementation:

def send_request(self, request: str) -> None:
logger.debug('Sending request: {}'.format(request))

task = asyncio.ensure_future(self._ws.send(request))
self._send_request_tasks.append(task)


New
stop
implementation:

async def stop(self) -> None:
if self._listen_task is None or self._ws is None:
return

# Wait for comms activity to stop.
if self._send_request_tasks:
await asyncio.wait(self._send_request_tasks)
...

Answer

You could use a set of tasks:

self._send_request_tasks = set()

Schedule the tasks using ensure_future and clean up using add_done_callback:

def send_request(self, request: str) -> None:
    task = asyncio.ensure_future(self._ws.send(request))
    self._send_request_tasks.add(task)
    task.add_done_callback(self._send_request_tasks.remove)

And wait for the set of tasks to complete:

async def stop(self):
    if self._send_request_tasks:
        await asyncio.wait(self._send_request_tasks)
Comments