从流中产生的正确方法是什么?

What is the correct way to yield from a stream?

我有一个 Connection 类型,用于包装来自 asyncio 的 read/write 流对。

class Connection(object):

    def __init__(self, stream_in, stream_out):
        self._streams_ = (stream_in, stream_out)

    def read(self, n_bytes: int = -1):
        stream = self._streams_[0]
        return stream.read(n_bytes)

    def write(self, bytes_: bytes):
        stream = self._streams_[1]
        stream.write(bytes_)
        yield from stream.drain()

当一个新的客户端连接到服务器时,new_connection 将创建一个新的 Connection 对象,并期望接收 4 个字节。

@asyncio.coroutine
def new_connection(stream_in, stream_out):
    conn = Connection(stream_in, stream_out)
    data = yield from conn.read(4)
    print(data)

客户端发送4个字节。

@asyncio.coroutine
def client(loop):
    ...
    conn = Connection(stream_in, stream_out)
    yield from conn.write(b'test')

这按我的预期工作,但我必须为每次调用 readwrite 编写 yield from。出于这个原因,我尝试将 yield from 移动到 Connection

def read(self, n_bytes: int = -1):
    stream = self._streams_[0]
    data = yield from stream.read(n_bytes)
    return data

但是,我得到的不是预期的数据字节,而是一个生成器对象。

<generator object StreamReader.read at 0x1109983b8>

因此,对于 readwrite 的每次调用,我都必须小心 yield from。我的目标是将 new_connection 减少到以下值。

@asyncio.coroutine
def new_connection(stream_in, stream_out):
    conn = Connection(stream_in, stream_out)
    print(conn.read(4))

我发现第 620 行的 StreamReader source code 的一部分实际上是函数用法的完美示例。

在我之前的回答中,我忽略了一个事实,即 self.__in.read(n_bytes) 不仅是一个协程(我应该知道它来自 asyncio 模块 XD),而且它会产生一个结果在线的 。所以它实际上是一个生成器,你需要从中产生。

从源代码中借用这个循环,你的读取函数应该是这样的:

def read(self, n_bytes : int = -1):
    data = bytearray() #or whatever object you are looking for
    while 1:
        block = yield from self.__in.read(n_bytes)
        if not block:
            break
        data += block
    return data

因为self.__in.read(n_bytes)是一个生成器,你必须继续从它产生,直到它产生一个空的结果来表示读取结束。现在你的读取函数应该 return 数据而不是生成器。您不必从这个版本的 conn.read().

屈服

因为 StreamReader.read is a coroutine, your only options for calling it are a) wrapping it in a Task or Future 和 运行 通过事件循环,b) await 从用 async def 定义的协程中调用它,或 c) 使用 yield from,它来自定义为用 @asyncio.coroutine.

装饰的函数的协程

由于 Connection.read 是从事件循环中调用的(通过协程 new_connection),您不能将该事件循环重用于 运行 a TaskFuture for StreamReader.read: event loops can't be started while they're already running. You'd either have to stop the event loop (disastrous and probably not possible to do correctly) or create a new event loop (混乱并且违背了使用协程的目的)。这些都不是可取的,因此 Connection.read 需要是协程或 async 函数。

其他两个选项(async def 协程中的 await@asyncio.coroutine 装饰函数中的 yield from)大部分是等效的。唯一的区别是 async def and await were added in Python 3.5,所以对于 3.4,使用 yield from@asyncio.coroutine 是唯一的选择(协程和 asyncio 在 3.4 之前不存在,所以其他版本是无关紧要的)。就个人而言,我更喜欢使用 async defawait,因为使用 async def 定义协程比使用装饰器更清晰。

简而言之:让 Connection.readnew_connection 成为协程(使用装饰器或 async 关键字),并使用 await(或 yield from) 当调用其他协程时(new_connection中的await conn.read(4),以及Connection.read中的await self.__in.read(n_bytes))。