Danila Migalin Danila Migalin - 2 months ago 21
Python Question

Stream result of blocking function to client in python Tornado HTTP server

I have various functions that are using blocking I/O to get some large amounts of data and can write it so stream/file-like object (with chunking etc); also I have tornado HTTP server to provide this data to clients. As far as I cannot store all data in memory, I should stream it to clients as I'm receiving it from its source. So i wrote something like that:

import logging

logging.basicConfig(level=logging.DEBUG)

from concurrent.futures import ThreadPoolExecutor
from tornado import gen, httpserver, httpclient, web, ioloop, httputil, escape, locks, iostream
from threading import Event

def get_data(stream):
with open('/tmp/qq.dat') as file:
for chunk in iter(lambda: file.read(64*1024), b''):
stream.write(chunk)

class ProxyStream(object):
def __init__(self, request):
self._request = request

def write(self, data):
self._request.write(data)
event = Event()
self._request.flush(callback=lambda: event.set())
event.wait()
return len(data)

class Test(web.RequestHandler):
def initialize(self, workers):
self._workers = workers

@gen.coroutine
def get(self):
stream = ProxyStream(self)
yield self._workers.submit(get_data, stream)
logging.debug("GET done")
self.finish()


if __name__ == '__main__':
workers = ThreadPoolExecutor(4)
app = web.Application([
(r"/test", Test, {'workers': workers}),
])

server = httpserver.HTTPServer(app) server.bind(1488)
server.start(1)
ioloop.IOLoop.current().start()


It code above, get_data() function reads some file (that might be very big) and writes it in chunks to stream passed as argument. Stream is emulated by ProxyStream object that writes received data to RequestHandler object waiting until chunk flushes to network.

It appears that this code works as expected, but I still has some doubts whether there are some pitfalls in this approach or maybe there's a better way to solve this problem?

Answer

Actually I faced some problems that lead me to solution that looks better for me.

RequestHandler write() and flush() methods are not thread-safe and should be called from thread where ioloop is running (see this and this). So correct way is to wrap write() & flush() into IOLoop.add_callback so they will be called on next ioloop iteration. Resulting code is smth like this:

class ProxyStream(object):
def __init__(self, handler, headers=[]):
    self._handler = handler

def sync_write(self, data, event):
    self._handler.write(data)
    self._handler.flush(callback=lambda: event.set())

def write(self, data):
    if not self._handler.dead:
        event = Event()
        IOLoop.current().add_callback(self.sync_write, data, event)
        event.wait()
        return len(data)
    else:
        raise IOError("Client has closed connection")

(RequestHandler passing itself to some synchronous code should set self.dead = True at on_connection_close() to stop streaming once client is disconnected)