从流中产生的正确方法是什么?
What is the correct way to yield from a stream?
我有一个 Connection
类型,用于包装来自 asyncio
的 read/write 流对。
class Connection(object):
def __init__(self, stream_in, stream_out):
self._streams_ = (stream_in, stream_out)
def read(self, n_bytes: int = -1):
stream = self._streams_[0]
return stream.read(n_bytes)
def write(self, bytes_: bytes):
stream = self._streams_[1]
stream.write(bytes_)
yield from stream.drain()
当一个新的客户端连接到服务器时,new_connection
将创建一个新的 Connection
对象,并期望接收 4 个字节。
@asyncio.coroutine
def new_connection(stream_in, stream_out):
conn = Connection(stream_in, stream_out)
data = yield from conn.read(4)
print(data)
客户端发送4个字节。
@asyncio.coroutine
def client(loop):
...
conn = Connection(stream_in, stream_out)
yield from conn.write(b'test')
这按我的预期工作,但我必须为每次调用 read
和 write
编写 yield from
。出于这个原因,我尝试将 yield from
移动到 Connection
。
def read(self, n_bytes: int = -1):
stream = self._streams_[0]
data = yield from stream.read(n_bytes)
return data
但是,我得到的不是预期的数据字节,而是一个生成器对象。
<generator object StreamReader.read at 0x1109983b8>
因此,对于 read
和 write
的每次调用,我都必须小心 yield from
。我的目标是将 new_connection
减少到以下值。
@asyncio.coroutine
def new_connection(stream_in, stream_out):
conn = Connection(stream_in, stream_out)
print(conn.read(4))
我发现第 620 行的 StreamReader source code 的一部分实际上是函数用法的完美示例。
在我之前的回答中,我忽略了一个事实,即 self.__in.read(n_bytes)
不仅是一个协程(我应该知道它来自 asyncio
模块 XD),而且它会产生一个结果在线的 。所以它实际上是一个生成器,你需要从中产生。
从源代码中借用这个循环,你的读取函数应该是这样的:
def read(self, n_bytes : int = -1):
data = bytearray() #or whatever object you are looking for
while 1:
block = yield from self.__in.read(n_bytes)
if not block:
break
data += block
return data
因为self.__in.read(n_bytes)
是一个生成器,你必须继续从它产生,直到它产生一个空的结果来表示读取结束。现在你的读取函数应该 return 数据而不是生成器。您不必从这个版本的 conn.read()
.
屈服
因为 StreamReader.read
is a coroutine, your only options for calling it are a) wrapping it in a Task
or Future
和 运行 通过事件循环,b) await
从用 async def
定义的协程中调用它,或 c) 使用 yield from
,它来自定义为用 @asyncio.coroutine
.
装饰的函数的协程
由于 Connection.read
是从事件循环中调用的(通过协程 new_connection
),您不能将该事件循环重用于 运行 a Task
或 Future
for StreamReader.read
: event loops can't be started while they're already running. You'd either have to stop the event loop (disastrous and probably not possible to do correctly) or create a new event loop (混乱并且违背了使用协程的目的)。这些都不是可取的,因此 Connection.read
需要是协程或 async
函数。
其他两个选项(async def
协程中的 await
或 @asyncio.coroutine
装饰函数中的 yield from
)大部分是等效的。唯一的区别是 async def
and await
were added in Python 3.5,所以对于 3.4,使用 yield from
和 @asyncio.coroutine
是唯一的选择(协程和 asyncio
在 3.4 之前不存在,所以其他版本是无关紧要的)。就个人而言,我更喜欢使用 async def
和 await
,因为使用 async def
定义协程比使用装饰器更清晰。
简而言之:让 Connection.read
和 new_connection
成为协程(使用装饰器或 async
关键字),并使用 await
(或 yield from
) 当调用其他协程时(new_connection
中的await conn.read(4)
,以及Connection.read
中的await self.__in.read(n_bytes)
)。
我有一个 Connection
类型,用于包装来自 asyncio
的 read/write 流对。
class Connection(object):
def __init__(self, stream_in, stream_out):
self._streams_ = (stream_in, stream_out)
def read(self, n_bytes: int = -1):
stream = self._streams_[0]
return stream.read(n_bytes)
def write(self, bytes_: bytes):
stream = self._streams_[1]
stream.write(bytes_)
yield from stream.drain()
当一个新的客户端连接到服务器时,new_connection
将创建一个新的 Connection
对象,并期望接收 4 个字节。
@asyncio.coroutine
def new_connection(stream_in, stream_out):
conn = Connection(stream_in, stream_out)
data = yield from conn.read(4)
print(data)
客户端发送4个字节。
@asyncio.coroutine
def client(loop):
...
conn = Connection(stream_in, stream_out)
yield from conn.write(b'test')
这按我的预期工作,但我必须为每次调用 read
和 write
编写 yield from
。出于这个原因,我尝试将 yield from
移动到 Connection
。
def read(self, n_bytes: int = -1):
stream = self._streams_[0]
data = yield from stream.read(n_bytes)
return data
但是,我得到的不是预期的数据字节,而是一个生成器对象。
<generator object StreamReader.read at 0x1109983b8>
因此,对于 read
和 write
的每次调用,我都必须小心 yield from
。我的目标是将 new_connection
减少到以下值。
@asyncio.coroutine
def new_connection(stream_in, stream_out):
conn = Connection(stream_in, stream_out)
print(conn.read(4))
我发现第 620 行的 StreamReader source code 的一部分实际上是函数用法的完美示例。
在我之前的回答中,我忽略了一个事实,即 self.__in.read(n_bytes)
不仅是一个协程(我应该知道它来自 asyncio
模块 XD),而且它会产生一个结果在线的 。所以它实际上是一个生成器,你需要从中产生。
从源代码中借用这个循环,你的读取函数应该是这样的:
def read(self, n_bytes : int = -1):
data = bytearray() #or whatever object you are looking for
while 1:
block = yield from self.__in.read(n_bytes)
if not block:
break
data += block
return data
因为self.__in.read(n_bytes)
是一个生成器,你必须继续从它产生,直到它产生一个空的结果来表示读取结束。现在你的读取函数应该 return 数据而不是生成器。您不必从这个版本的 conn.read()
.
因为 StreamReader.read
is a coroutine, your only options for calling it are a) wrapping it in a Task
or Future
和 运行 通过事件循环,b) await
从用 async def
定义的协程中调用它,或 c) 使用 yield from
,它来自定义为用 @asyncio.coroutine
.
由于 Connection.read
是从事件循环中调用的(通过协程 new_connection
),您不能将该事件循环重用于 运行 a Task
或 Future
for StreamReader.read
: event loops can't be started while they're already running. You'd either have to stop the event loop (disastrous and probably not possible to do correctly) or create a new event loop (混乱并且违背了使用协程的目的)。这些都不是可取的,因此 Connection.read
需要是协程或 async
函数。
其他两个选项(async def
协程中的 await
或 @asyncio.coroutine
装饰函数中的 yield from
)大部分是等效的。唯一的区别是 async def
and await
were added in Python 3.5,所以对于 3.4,使用 yield from
和 @asyncio.coroutine
是唯一的选择(协程和 asyncio
在 3.4 之前不存在,所以其他版本是无关紧要的)。就个人而言,我更喜欢使用 async def
和 await
,因为使用 async def
定义协程比使用装饰器更清晰。
简而言之:让 Connection.read
和 new_connection
成为协程(使用装饰器或 async
关键字),并使用 await
(或 yield from
) 当调用其他协程时(new_connection
中的await conn.read(4)
,以及Connection.read
中的await self.__in.read(n_bytes)
)。