mdelolmo mdelolmo - 1 month ago 19
Python Question

Make websocket callback asynchronous with asyncio

I am trying to implement a basic websocket client using asyncio and websockets with Python 3.5.2.

Basically, I want

connect_to_dealer
to be a blocking call, but wait for the websocket message on a different thread.

After reading some docs (I have very little exp with Python), I concluded that
asyncio.ensure_future()
passing a coroutine (
listen_for_message
) was the way to go.

Now, I get to run
listen_for_message
on a different thread, but from within the coroutine I can't seem to use
await
or any other mechanism to make the calls synchronous. If I do it, the execution waits forever (it hangs) even for a simple
sleep
.

I'd like to know what I'm doing wrong.

async def listen_for_message(self, future, websocket):
while (True):
try:
await asyncio.sleep(1) # It hangs here
print('Listening for a message...')
message = await websocket.recv() # If I remove the sleep, hangs here
print("< {}".format(message))
future.set_result(message)
future.done()
except websockets.ConnectionClosed as cc:
print('Connection closed')
except Exception as e:
print('Something happened')

def handle_connect_message(self, future):
# We must first remove the websocket-specific payload because we're only interested in the connect protocol msg
print(future.result)

async def connect_to_dealer(self):
print('connect to dealer')
websocket = await websockets.connect('wss://mywebsocket'))
hello_message = await websocket.recv()
print("< {}".format(hello_message))
# We need to parse the connection ID out of the message
connection_id = hello_message['connectionId']
print('Got connection id {}'.format(connection_id))
sub_response = requests.put('https://subscribetotraffic{user_id}?connection={connection_id}'.format(user_id='username', connection_id=connection_id), headers=headers)
if sub_response.status_code == 200:
print('Now we\'re observing traffic')
else:
print('Oops request failed with {code}'.format(code=sub_response.status_code))
# Now we need to handle messages but continue with the regular execution
try:
future = asyncio.get_event_loop().create_future()
future.add_done_callback(self.handle_connect_message)
asyncio.ensure_future(self.listen_for_message(future, websocket))
except Exception as e:
print(e)

Answer

Is there a specific reason you need to work with explicit futures?

With asyncio you can use a combination of coroutines and Tasks to achieve most purposes. Tasks are essentially wrapped coroutines that go about cranking themselves over in the background, independently of other async code, so you don't have to explicitly manage their flow or juggle them with other bits of code.

I am not entirely sure of your end goal, but perhaps the approach elaborated below gives you something to work with:

import asyncio

async def listen_for_message():

    while True:

        await asyncio.sleep(0)

        try:
            print('Listening for a message...')
            message = await websocket.recv()

            print("< {}".format(message))

        except websockets.ConnectionClosed as cc:
            print('Connection closed')

        except Exception as e:
            print('Something happened')


async def connect_to_dealer():

    print('connect to dealer')
    websocket = await websockets.connect('wss://mywebsocket')

    hello_message = await websocket.recv()
    print("< {}".format(hello_message))

    # We need to parse the connection ID out of the message
    connection_id = hello_message['connectionId']
    print('Got connection id {}'.format(connection_id))

    sub_response = requests.put('https://subscribetotraffic{user_id}?connection={connection_id}'.format(
        user_id='username', connection_id=connection_id), headers=headers)

    if sub_response.status_code == 200:
        print('Now we\'re observing traffic')
    else:
        print('Oops request failed with {code}'.format(code=sub_response.status_code))


async def my_app():

    # this will block until connect_to_dealer() returns
    websocket = await connect_to_dealer()

    # start listen_for_message() in its own task wrapper, so doing it continues in the background
    asyncio.ensure_future(listen_for_message(websocket))

    # you can continue with other code here that can now coexist with listen_for_message()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(my_app())
    loop.run_forever()