Eugene Naydenov Eugene Naydenov - 2 months ago 31
Python Question

Listen to ZeroMQ in aiohttp application process

I run

aiohttp
application with
Gunicorn
behind
nginx
.
In my application's initialization module I don't run the application using
web.run_app(app)
but just create an instance that will be imported by
Gunicorn
to run it in each worker
Gunicorn
creates.
So
Gunicorn
creates a few worker processes, event loops within them, and then runs the application's request handler in those loops.

My
aiohttp
application has a collection of connected
WebSockets
(mobile application clients) that I want to notify on event occurred in any of application processes started by
Gunicorn
.
And I want to notify all
WebSockets
that are connected to all application processes.
Therefore I create some kind of upstream proxy using
ZeroMQ
and I want to subscribe to it using
zmq.SUB
socket from each application process.

...So basically I want to achieve something like this in each application worker:

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://localhost:5555')

while True:
event = socket.recv()
for ws in app['websockets']:
ws.send_bytes(event)
# break before app shutdown. How?


How can I listen the
ZeroMQ
proxy within
aiohttp
application to forward messages to
WebSockets
?

Where can I put this code to run in background within event loop and how to run and shutdown it correctly within
aiohttp
application's life cycle?




UPDATE

I've already created an issue in aiohttp's GitHub repository describing the problem and proposing a possible solution. I'd highly appreciate an input here or there on matter of the problem described.

Answer

Ok, the question and the discussion on this issue has led to the new functionality I've contributed to aiohttp, namely in version 1.0 we'll have an ability to register on_startup application signals using Application.on_startup() method.

Documentation.
Working example on the master branch.

#!/usr/bin/env python3
"""Example of aiohttp.web.Application.on_startup signal handler"""
import asyncio

import aioredis
from aiohttp.web import Application, WebSocketResponse, run_app

async def websocket_handler(request):
    ws = WebSocketResponse()
    await ws.prepare(request)
    request.app['websockets'].append(ws)
    try:
        async for msg in ws:
            print(msg)
            await asyncio.sleep(1)
    finally:
        request.app['websockets'].remove(ws)
    return ws


async def on_shutdown(app):
    for ws in app['websockets']:
        await ws.close(code=999, message='Server shutdown')


async def listen_to_redis(app):
    try:
        sub = await aioredis.create_redis(('localhost', 6379), loop=app.loop)
        ch, *_ = await sub.subscribe('news')
        async for msg in ch.iter(encoding='utf-8'):
            # Forward message to all connected websockets:
            for ws in app['websockets']:
                ws.send_str('{}: {}'.format(ch.name, msg))
            print("message in {}: {}".format(ch.name, msg))
    except asyncio.CancelledError:
        pass
    finally:
        print('Cancel Redis listener: close connection...')
        await sub.unsubscribe(ch.name)
        await sub.quit()
        print('Redis connection closed.')


async def start_background_tasks(app):
    app['redis_listener'] = app.loop.create_task(listen_to_redis(app))


async def cleanup_background_tasks(app):
    print('cleanup background tasks...')
    app['redis_listener'].cancel()
    await app['redis_listener']


async def init(loop):
    app = Application(loop=loop)
    app['websockets'] = []
    app.router.add_get('/news', websocket_handler)
    app.on_startup.append(start_background_tasks)
    app.on_cleanup.append(cleanup_background_tasks)
    app.on_shutdown.append(on_shutdown)
    return app

loop = asyncio.get_event_loop()
app = loop.run_until_complete(init(loop))
run_app(app)
Comments