josteinb josteinb - 5 months ago 12
Python Question

Read file line by line with asyncio

I wish to read several log files as they are written and process their input with asyncio. The code will have to run on windows. From what I understand from searching around both stackoverflow and the web, asynchronous file I/O is tricky on most operating systems (

select
will not work as intended, for example). While I'm sure I could do this with other methods (e.g. threads), I though I would try out asyncio to see what it is like. The most helpful answer would probably be one that describes what the "architecture" of a solution to this problem should look like, i.e. how different functions and coroutines should be called or scheduled.

The following gives me a generator that reads the files line by line (through polling, which is acceptable):

import time

def line_reader(f):
while True:
line = f.readline()
if not line:
time.sleep(POLL_INTERVAL)
continue
process_line(line)


With several files to monitor and process, this sort of code would require threads. I have modified it slightly to be more usable with asyncio:

import asyncio

def line_reader(f):
while True:
line = f.readline()
if not line:
yield from asyncio.sleep(POLL_INTERVAL)
continue
process_line(line)


This sort of works when I schedule it through the asyncio event loop, but if
process_data
blocks, then that is of course not good. When starting out, I imagined the solution would look something like

def process_data():
...
while True:
...
line = yield from line_reader()
...


but I could not figure out how to make that work (at least not without
process_data
managing quite a bit of state).

Any ideas on how I should structure this kind of code?

Answer

From what I understand from searching around both stackoverflow and the web, asynchronous file I/O is tricky on most operating systems (select will not work as intended, for example). While I'm sure I could do this with other methods (e.g. threads), I though I would try out asyncio to see what it is like.

asyncio is select based on *nix systems under the hood, so you won't be able to do non-blocking file I/O without the use of threads. On Windows, asyncio can use IOCP, which supports non-blocking file I/O, but this isn't supported by asyncio.

Your code is fine, except you should do blocking I/O calls in threads, so that you don't block the event loop if the I/O is slow. Fortunately, it's really simple to off load work to threads using the loop.run_in_executor function.

First, setup a dedicated thread-pool for your I/O:

from concurrent.futures import ThreadPoolExecutor
io_pool_exc = ThreadPoolExecutor()

And then simply offload any blocking I/O calls to the executor:

...
line = yield from loop.run_in_executor(io_pool_exc, f.readline)
...
Comments