使文件处理代码与 asyncio 兼容
Making file-handling code compatible with asyncio
图书馆接受文件输入的"traditional"方式是做这样的事情:
def foo(file_obj):
data = file_obj.read()
# Do other things here
客户端代码负责打开文件,寻找适当的点(如果需要),然后关闭它。如果客户想给我们一个管道或套接字(或 StringIO
,就此而言),他们可以这样做,而且它就可以工作。
但这与 asyncio 不兼容,它需要更像这样的语法:
def foo(file_obj):
data = yield from file_obj.read()
# Do other things here
当然,这种语法只适用于 asyncio 对象;试图将它与传统文件对象一起使用会造成混乱。反之亦然。
更糟糕的是,在我看来没有办法将这个 yield from
包装在传统的 .read()
方法中,因为我们需要一直让步到事件循环,而不仅仅是在阅读发生的地点。 gevent 库确实做了类似的事情,但我不知道如何将他们的 greenlet 代码改编成生成器。
如果我正在编写处理文件输入的库,我应该如何处理这种情况?我需要两个版本的 foo()
函数吗?我有很多这样的功能;复制所有这些是不可扩展的。
我可以告诉我的客户开发人员使用 run_in_executor()
或类似的东西,但这感觉就像是在对抗 asyncio 而不是使用它。
这是显式异步框架的缺点之一。与 gevent
不同,它可以在不更改任何代码的情况下对同步代码进行 monkeypatch 使其成为异步代码,您不能使同步代码 asyncio
兼容而不重写它以使用 asyncio.coroutine
和 yield from
(或至少 asyncio.Futures
和回调)一直向下。
据我所知,无法在 asyncio
和正常的同步上下文中使相同的函数正常工作;任何 asyncio
兼容的代码都将依赖事件循环来 运行 驱动异步部分,因此它不会在正常上下文中工作,并且同步代码总是会以阻塞结束如果它的 运行 在 asyncio
上下文中,则事件循环。这就是为什么您通常会看到 asyncio
-specific(或至少是异步框架特定的)库版本以及同步版本。没有什么好的方法可以提供一个统一的 API,同时适用于两者。
经过进一步考虑,我得出的结论是可以做到这一点,但它并不完美。
从繁体版入手foo()
:
def foo(file_obj):
data = file_obj.read()
# Do other things here
我们需要传递一个文件对象,它将在此处表现 "correctly"。当文件对象需要做I/O时,应该遵循这个过程:
- 它创建了一个新的 event。
- 它创建一个闭包,当调用它时,它会执行必要的 I/O 然后设置事件。
- 它使用
call_soon_threadsafe()
将闭包交给事件循环。
- 它阻止了事件。
下面是一些示例代码:
import asyncio, threading
# inside the file object class
def read(self):
event = threading.Event()
def closure():
# self.reader is an asyncio StreamReader or similar
self._tmp = yield from self.reader.read()
event.set()
asyncio.get_event_loop().call_soon_threadsafe(closure)
event.wait()
return self._tmp
然后我们安排 foo(file_obj)
在执行程序中成为 运行(例如,按照 OP 中的建议使用 run_in_executor()
)。
这项技术的好处在于,即使 foo()
的作者不了解 asyncio
,它也能发挥作用。它还确保在事件循环中提供 I/O,这在某些情况下可能是可取的。
图书馆接受文件输入的"traditional"方式是做这样的事情:
def foo(file_obj):
data = file_obj.read()
# Do other things here
客户端代码负责打开文件,寻找适当的点(如果需要),然后关闭它。如果客户想给我们一个管道或套接字(或 StringIO
,就此而言),他们可以这样做,而且它就可以工作。
但这与 asyncio 不兼容,它需要更像这样的语法:
def foo(file_obj):
data = yield from file_obj.read()
# Do other things here
当然,这种语法只适用于 asyncio 对象;试图将它与传统文件对象一起使用会造成混乱。反之亦然。
更糟糕的是,在我看来没有办法将这个 yield from
包装在传统的 .read()
方法中,因为我们需要一直让步到事件循环,而不仅仅是在阅读发生的地点。 gevent 库确实做了类似的事情,但我不知道如何将他们的 greenlet 代码改编成生成器。
如果我正在编写处理文件输入的库,我应该如何处理这种情况?我需要两个版本的 foo()
函数吗?我有很多这样的功能;复制所有这些是不可扩展的。
我可以告诉我的客户开发人员使用 run_in_executor()
或类似的东西,但这感觉就像是在对抗 asyncio 而不是使用它。
这是显式异步框架的缺点之一。与 gevent
不同,它可以在不更改任何代码的情况下对同步代码进行 monkeypatch 使其成为异步代码,您不能使同步代码 asyncio
兼容而不重写它以使用 asyncio.coroutine
和 yield from
(或至少 asyncio.Futures
和回调)一直向下。
据我所知,无法在 asyncio
和正常的同步上下文中使相同的函数正常工作;任何 asyncio
兼容的代码都将依赖事件循环来 运行 驱动异步部分,因此它不会在正常上下文中工作,并且同步代码总是会以阻塞结束如果它的 运行 在 asyncio
上下文中,则事件循环。这就是为什么您通常会看到 asyncio
-specific(或至少是异步框架特定的)库版本以及同步版本。没有什么好的方法可以提供一个统一的 API,同时适用于两者。
经过进一步考虑,我得出的结论是可以做到这一点,但它并不完美。
从繁体版入手foo()
:
def foo(file_obj):
data = file_obj.read()
# Do other things here
我们需要传递一个文件对象,它将在此处表现 "correctly"。当文件对象需要做I/O时,应该遵循这个过程:
- 它创建了一个新的 event。
- 它创建一个闭包,当调用它时,它会执行必要的 I/O 然后设置事件。
- 它使用
call_soon_threadsafe()
将闭包交给事件循环。 - 它阻止了事件。
下面是一些示例代码:
import asyncio, threading
# inside the file object class
def read(self):
event = threading.Event()
def closure():
# self.reader is an asyncio StreamReader or similar
self._tmp = yield from self.reader.read()
event.set()
asyncio.get_event_loop().call_soon_threadsafe(closure)
event.wait()
return self._tmp
然后我们安排 foo(file_obj)
在执行程序中成为 运行(例如,按照 OP 中的建议使用 run_in_executor()
)。
这项技术的好处在于,即使 foo()
的作者不了解 asyncio
,它也能发挥作用。它还确保在事件循环中提供 I/O,这在某些情况下可能是可取的。