Zelta Zelta - 1 month ago 11
Python Question

Scheduling an asyncio coroutine from another thread without bunch of callbacks and synchronous waiting

I have to ask a clarification for this question.

I have a coroutine

send
that sends a message. I want to schedule in the
loop1
(which is running in thread 1) from the
loop2
(which is running in thread 2):

async def send_threadsafe(self, message, current_loop=loop2, dest_loop=loop1):
future = asyncio.run_coroutine_threadsafe(
send(message), loop=dest_loop
)


Now, the
future
returned by
asyncio.run_coroutine_threadsafe
is a
concurrent.futures.Future
. It cannot be waited asynchronously.

So the question is: how do I properly await
future
and/or how should I schedule my
send
to get an awaitable object?

I know I can do:

async def send_threadsafe(...):
future = ...
result = await current_loop.run_in_executor(None, future.result)


But is there a way to do it without using another thread? Because
run_in_executor
will send
future.result
to a threadpool and I don't want to utilize that threadpool.

The reason I don't want to use
call_soon_threadsafe
is that it requires creating several callbacks: first, to schedule running
send
in
loop1
, second, to actually run
send
in
loop1
and than schedule the third callback in
loop2
, and third, to set a result to a future that was created in the first callback (because asyncio futures are not threadsafe and I can't set result from
loop1
).

Answer

You can use asyncio.wrap_future to get an asyncio future from a concurrent future:

async def send_threadsafe(self, message, destination, *, loop=loop):
    concurrent = asyncio.run_coroutine_threadsafe(
        send(message), loop=destination)
    return await asyncio.wrap_future(concurrent, loop=loop)

It is possible to achieve the same thing by implementing an asyncio executor:

from concurrent.futures import Executor

class AsyncioExecutor(Executor):

    def __init__(self, loop):
        self.loop = loop

    def submit(self, fn, *args, **kwargs):
        coro = fn(*args, **kwargs)
        return asyncio.run_coroutine_threadsafe(coro, self.loop)

Example:

executor = AsyncioExecutor(remote_loop)
result = await loop.run_in_executor(executor, send, message)