使文件处理代码与 asyncio 兼容

Making file-handling code compatible with asyncio

图书馆接受文件输入的"traditional"方式是做这样的事情:

def foo(file_obj):
    data = file_obj.read()
    # Do other things here

客户端代码负责打开文件,寻找适当的点(如果需要),然后关闭它。如果客户想给我们一个管道或套接字(或 StringIO,就此而言),他们可以这样做,而且它就可以工作。

但这与 asyncio 不兼容,它需要更像这样的语法:

def foo(file_obj):
    data = yield from file_obj.read()
    # Do other things here

当然,这种语法只适用于 asyncio 对象;试图将它与传统文件对象一起使用会造成混乱。反之亦然。

更糟糕的是,在我看来没有办法将这个 yield from 包装在传统的 .read() 方法中,因为我们需要一直让步到事件循环,而不仅仅是在阅读发生的地点。 gevent 库确实做了类似的事情,但我不知道如何将他们的 greenlet 代码改编成生成器。

如果我正在编写处理文件输入的库,我应该如何处理这种情况?我需要两个版本的 foo() 函数吗?我有很多这样的功能;复制所有这些是不可扩展的。

我可以告诉我的客户开发人员使用 run_in_executor() 或类似的东西,但这感觉就像是在对抗 asyncio 而不是使用它。

这是显式异步框架的缺点之一。与 gevent 不同,它可以在不更改任何代码的情况下对同步代码进行 monkeypatch 使其成为异步代码,您不能使同步代码 asyncio 兼容而不重写它以使用 asyncio.coroutineyield from(或至少 asyncio.Futures 和回调)一直向下。

据我所知,无法在 asyncio 和正常的同步上下文中使相同的函数正常工作;任何 asyncio 兼容的代码都将依赖事件循环来 运行 驱动异步部分,因此它不会在正常上下文中工作,并且同步代码总是会以阻塞结束如果它的 运行 在 asyncio 上下文中,则事件循环。这就是为什么您通常会看到 asyncio-specific(或至少是异步框架特定的)库版本以及同步版本。没有什么好的方法可以提供一个统一的 API,同时适用于两者。

经过进一步考虑,我得出的结论是可以做到这一点,但它并不完美。

从繁体版入手foo():

def foo(file_obj):
    data = file_obj.read()
    # Do other things here

我们需要传递一个文件对象,它将在此处表现 "correctly"。当文件对象需要做I/O时,应该遵循这个过程:

  1. 它创建了一个新的 event
  2. 它创建一个闭包,当调用它时,它会执行必要的 I/O 然后设置事件。
  3. 它使用 call_soon_threadsafe() 将闭包交给事件循环。
  4. 它阻止了事件。

下面是一些示例代码:

import asyncio, threading

# inside the file object class
def read(self):
    event = threading.Event()
    def closure():
        # self.reader is an asyncio StreamReader or similar
        self._tmp = yield from self.reader.read()
        event.set()
    asyncio.get_event_loop().call_soon_threadsafe(closure)
    event.wait()
    return self._tmp

然后我们安排 foo(file_obj) 在执行程序中成为 运行(例如,按照 OP 中的建议使用 run_in_executor())。

这项技术的好处在于,即使 foo() 的作者不了解 asyncio,它也能发挥作用。它还确保在事件循环中提供 I/O,这在某些情况下可能是可取的。