使用 asyncio 逐行读取文件
Read file line by line with asyncio
我希望在写入多个日志文件时读取它们,并使用 asyncio 处理它们的输入。代码必须 运行 on windows。根据我在 Whosebug 和网络上搜索的了解,异步文件 I/O 在大多数操作系统上都很棘手(例如,select
将无法按预期工作)。虽然我确定我可以用其他方法(例如线程)做到这一点,但我还是会尝试使用 asyncio 看看它是什么样的。最有帮助的答案可能是描述此问题的解决方案 "architecture" 应该是什么样子的答案,即应该如何调用或调度不同的函数和协程。
下面给了我一个逐行读取文件的生成器(通过轮询,这是可以接受的):
import time
def line_reader(f):
while True:
line = f.readline()
if not line:
time.sleep(POLL_INTERVAL)
continue
process_line(line)
由于要监视和处理多个文件,此类代码需要线程。我稍微修改了它以便更适用于 asyncio:
import asyncio
def line_reader(f):
while True:
line = f.readline()
if not line:
yield from asyncio.sleep(POLL_INTERVAL)
continue
process_line(line)
当我通过 asyncio 事件循环安排它时,这种方法有效,但如果 process_data
阻塞,那当然不好。开始时,我想象解决方案看起来像
def process_data():
...
while True:
...
line = yield from line_reader()
...
但我无法弄清楚如何进行这项工作(至少在没有 process_data
管理相当多的状态的情况下)。
关于我应该如何构建这种代码有什么想法吗?
asyncio
暂不支持文件操作,抱歉。
因此无法解决您的问题。
我觉得你的代码结构不错,下面的代码在我的机器上运行良好:
import asyncio
PERIOD = 0.5
@asyncio.coroutine
def readline(f):
while True:
data = f.readline()
if data:
return data
yield from asyncio.sleep(PERIOD)
@asyncio.coroutine
def test():
with open('test.txt') as f:
while True:
line = yield from readline(f)
print('Got: {!r}'.format(line))
loop = asyncio.get_event_loop()
loop.run_until_complete(test())
From what I understand from searching around both Whosebug and the web, asynchronous file I/O is tricky on most operating systems (select will not work as intended, for example). While I'm sure I could do this with other methods (e.g. threads), I though I would try out asyncio to see what it is like.
asyncio
是 select
底层基于 *nix 系统,因此您将无法执行非阻塞文件 I/O 不使用线程。在 Windows 上,asyncio
可以使用 IOCP,它支持非阻塞文件 I/O,但 asyncio
不支持。
你的代码很好,除了你应该在线程中阻塞 I/O 调用,这样如果 I/O 很慢你就不会阻塞事件循环。幸运的是,使用 loop.run_in_executor
函数将工作卸载到线程非常简单。
首先,为您的I/O设置一个专用线程池:
from concurrent.futures import ThreadPoolExecutor
io_pool_exc = ThreadPoolExecutor()
然后简单地将任何阻塞的 I/O 调用卸载到执行程序:
...
line = yield from loop.run_in_executor(io_pool_exc, f.readline)
...
使用 aiofiles:
async with aiofiles.open('filename', mode='r') as f:
async for line in f:
print(line)
编辑 1
正如@Jashandeep 提到的,您应该关心阻塞操作:
另一种方法是select
and or epoll
:
from select import select
files_to_read, files_to_write, exceptions = select([f1, f2], [f1, f2], [f1, f2], timeout=.1)
timeout
参数在这里很重要。
参见:https://docs.python.org/3/library/select.html#select.select
编辑 2
您可以为 read/write 注册文件:loop.add_reader()
它在循环中使用内部 EPOLL 处理程序。
编辑 3
但请记住,Epoll 不适用于常规文件。
我希望在写入多个日志文件时读取它们,并使用 asyncio 处理它们的输入。代码必须 运行 on windows。根据我在 Whosebug 和网络上搜索的了解,异步文件 I/O 在大多数操作系统上都很棘手(例如,select
将无法按预期工作)。虽然我确定我可以用其他方法(例如线程)做到这一点,但我还是会尝试使用 asyncio 看看它是什么样的。最有帮助的答案可能是描述此问题的解决方案 "architecture" 应该是什么样子的答案,即应该如何调用或调度不同的函数和协程。
下面给了我一个逐行读取文件的生成器(通过轮询,这是可以接受的):
import time
def line_reader(f):
while True:
line = f.readline()
if not line:
time.sleep(POLL_INTERVAL)
continue
process_line(line)
由于要监视和处理多个文件,此类代码需要线程。我稍微修改了它以便更适用于 asyncio:
import asyncio
def line_reader(f):
while True:
line = f.readline()
if not line:
yield from asyncio.sleep(POLL_INTERVAL)
continue
process_line(line)
当我通过 asyncio 事件循环安排它时,这种方法有效,但如果 process_data
阻塞,那当然不好。开始时,我想象解决方案看起来像
def process_data():
...
while True:
...
line = yield from line_reader()
...
但我无法弄清楚如何进行这项工作(至少在没有 process_data
管理相当多的状态的情况下)。
关于我应该如何构建这种代码有什么想法吗?
asyncio
暂不支持文件操作,抱歉。
因此无法解决您的问题。
我觉得你的代码结构不错,下面的代码在我的机器上运行良好:
import asyncio
PERIOD = 0.5
@asyncio.coroutine
def readline(f):
while True:
data = f.readline()
if data:
return data
yield from asyncio.sleep(PERIOD)
@asyncio.coroutine
def test():
with open('test.txt') as f:
while True:
line = yield from readline(f)
print('Got: {!r}'.format(line))
loop = asyncio.get_event_loop()
loop.run_until_complete(test())
From what I understand from searching around both Whosebug and the web, asynchronous file I/O is tricky on most operating systems (select will not work as intended, for example). While I'm sure I could do this with other methods (e.g. threads), I though I would try out asyncio to see what it is like.
asyncio
是 select
底层基于 *nix 系统,因此您将无法执行非阻塞文件 I/O 不使用线程。在 Windows 上,asyncio
可以使用 IOCP,它支持非阻塞文件 I/O,但 asyncio
不支持。
你的代码很好,除了你应该在线程中阻塞 I/O 调用,这样如果 I/O 很慢你就不会阻塞事件循环。幸运的是,使用 loop.run_in_executor
函数将工作卸载到线程非常简单。
首先,为您的I/O设置一个专用线程池:
from concurrent.futures import ThreadPoolExecutor
io_pool_exc = ThreadPoolExecutor()
然后简单地将任何阻塞的 I/O 调用卸载到执行程序:
...
line = yield from loop.run_in_executor(io_pool_exc, f.readline)
...
使用 aiofiles:
async with aiofiles.open('filename', mode='r') as f:
async for line in f:
print(line)
编辑 1
正如@Jashandeep 提到的,您应该关心阻塞操作:
另一种方法是select
and or epoll
:
from select import select
files_to_read, files_to_write, exceptions = select([f1, f2], [f1, f2], [f1, f2], timeout=.1)
timeout
参数在这里很重要。
参见:https://docs.python.org/3/library/select.html#select.select
编辑 2
您可以为 read/write 注册文件:loop.add_reader()
它在循环中使用内部 EPOLL 处理程序。
编辑 3
但请记住,Epoll 不适用于常规文件。