使用 asyncio 逐行读取文件

Read file line by line with asyncio

我希望在写入多个日志文件时读取它们,并使用 asyncio 处理它们的输入。代码必须 运行 on windows。根据我在 Whosebug 和网络上搜索的了解,异步文件 I/O 在大多数操作系统上都很棘手(例如,select 将无法按预期工作)。虽然我确定我可以用其他方法(例如线程)做到这一点,但我还是会尝试使用 asyncio 看看它是什么样的。最有帮助的答案可能是描述此问题的解决方案 "architecture" 应该是什么样子的答案,即应该如何调用或调度不同的函数和协程。

下面给了我一个逐行读取文件的生成器(通过轮询,这是可以接受的):

import time

def line_reader(f):
    while True:
        line = f.readline()
        if not line:
            time.sleep(POLL_INTERVAL)
            continue
        process_line(line)

由于要监视和处理多个文件,此类代码需要线程。我稍微修改了它以便更适用于 asyncio:

import asyncio

def line_reader(f):
    while True:
        line = f.readline()
        if not line:
            yield from asyncio.sleep(POLL_INTERVAL)
            continue
        process_line(line)

当我通过 asyncio 事件循环安排它时,这种方法有效,但如果 process_data 阻塞,那当然不好。开始时,我想象解决方案看起来像

def process_data():
    ...
    while True:
        ...
        line = yield from line_reader()
        ...

但我无法弄清楚如何进行这项工作(至少在没有 process_data 管理相当多的状态的情况下)。

关于我应该如何构建这种代码有什么想法吗?

asyncio暂不支持文件操作,抱歉。

因此无法解决您的问题。

我觉得你的代码结构不错,下面的代码在我的机器上运行良好:

import asyncio

PERIOD = 0.5

@asyncio.coroutine
def readline(f):
    while True:
        data = f.readline()
        if data:
            return data
        yield from asyncio.sleep(PERIOD)

@asyncio.coroutine
def test():
    with open('test.txt') as f:
        while True:
            line = yield from readline(f)
            print('Got: {!r}'.format(line))

loop = asyncio.get_event_loop()
loop.run_until_complete(test())

From what I understand from searching around both Whosebug and the web, asynchronous file I/O is tricky on most operating systems (select will not work as intended, for example). While I'm sure I could do this with other methods (e.g. threads), I though I would try out asyncio to see what it is like.

asyncio select 底层基于 *nix 系统,因此您将无法执行非阻塞文件 I/O 不使用线程。在 Windows 上,asyncio 可以使用 IOCP,它支持非阻塞文件 I/O,但 asyncio 不支持。

你的代码很好,除了你应该在线程中阻塞 I/O 调用,这样如果 I/O 很慢你就不会阻塞事件循环。幸运的是,使用 loop.run_in_executor 函数将工作卸载到线程非常简单。

首先,为您的I/O设置一个专用线程池:

from concurrent.futures import ThreadPoolExecutor
io_pool_exc = ThreadPoolExecutor()

然后简单地将任何阻塞的 I/O 调用卸载到执行程序:

...
line = yield from loop.run_in_executor(io_pool_exc, f.readline)
...

使用 aiofiles:

async with aiofiles.open('filename', mode='r') as f:
    async for line in f:
        print(line)

编辑 1

正如@Jashandeep 提到的,您应该关心阻塞操作:

另一种方法是select and or epoll:

from select import select

files_to_read, files_to_write, exceptions = select([f1, f2], [f1, f2], [f1, f2], timeout=.1)

timeout参数在这里很重要。

参见:https://docs.python.org/3/library/select.html#select.select

编辑 2

您可以为 read/write 注册文件:loop.add_reader()

它在循环中使用内部 EPOLL 处理程序。

编辑 3

但请记住,Epoll 不适用于常规文件。