为什么要显式调用 asyncio.StreamWriter.drain?
Why should asyncio.StreamWriter.drain be explicitly called?
来自 doc:
write(data)
Write data to the stream.
This method is not subject to flow control. Calls to write() should be followed by drain().
coroutine drain()
Wait until it is appropriate to resume writing to the stream. Example:
writer.write(data)
await writer.drain()
据我了解,
- 每次调用
write
都需要调用drain
。
- 如果不是我猜,
write
会阻塞循环线程
那write为什么不是自动调用的协程呢?为什么会一呼write
而不必漏?我可以想到两种情况
- 您想
write
和 close
立即
- 您必须在消息完成之前缓冲一些数据。
第一个是特例,我想我们可以有不同的API。缓冲应该在写函数内部处理,应用程序不应该关心。
让我换个方式提出问题。这样做的缺点是什么? python3.8 版本是否有效地做到了这一点?
async def awrite(writer, data):
writer.write(data)
await writer.drain()
注意:drain
文档明确指出以下内容:
When there is nothing to wait for, the drain()
returns immediately.
再次阅读答案和链接,我认为函数是这样工作的。 注意:检查已接受的答案以获得更准确的版本。
def write(data):
remaining = socket.try_write(data)
if remaining:
_pendingbuffer.append(remaining) # Buffer will keep growing if other side is slow and we have a lot of data
async def drain():
if len(_pendingbuffer) < BUF_LIMIT:
return
await wait_until_other_side_is_up_to_speed()
assert len(_pendingbuffer) < BUF_LIMIT
async def awrite(writer, data):
writer.write(data)
await writer.drain()
那么什么时候用什么:
- 当数据不连续时,比如响应HTTP请求。我们只需要发送一些数据,而不关心何时到达,内存也不是问题 - 只需使用
write
- 同上但内存是个问题,使用
awrite
- 当向大量客户端流式传输数据时(例如一些实时流或一个巨大的文件)。如果数据在每个连接的缓冲区中都是重复的,它肯定会溢出 RAM。在这种情况下,编写一个循环,每次迭代获取一大块数据并调用
awrite
。如果文件很大,loop.sendfile
如果可用则更好。
From what I understand, (1) You need to call drain every time write is called. (2) If not I guess, write will block the loop thread
两者都不正确,但混淆是可以理解的。 write()
的工作方式如下:
调用 write()
只是将数据存储到缓冲区,将其留给事件循环以在稍后实际写出,而无需程序进一步干预。就应用程序而言,数据在后台写入的速度与另一方接收数据的速度一样快。换句话说,每个 write()
将安排其数据使用尽可能多的 OS 级写入来传输,这些写入在相应的文件描述符实际可写时发出。所有这一切都会自动发生,甚至无需等待 drain()
.
write()
不是协程,它绝对从不 阻塞事件循环。
第二个 属性 听起来很方便 - 您可以在任何需要的地方调用 write()
,即使不是 async def
的函数 - 但它实际上是一个主要的 write()
的缺陷。流 API 公开的写入与接受数据的 OS 完全分离,因此如果您写入数据的速度快于网络对等体读取数据的速度,内部缓冲区将不断增长,您将拥有memory leak 在你手上。 drain()
修复了这个问题:如果写入缓冲区变得太大,等待它暂停协程,并在 os.write()
在后台成功执行并且缓冲区缩小后再次恢复它。
您不需要在 每次 写入之后等待 drain()
,但您确实需要偶尔等待它,通常是在循环的迭代之间 write()
被调用。例如:
while True:
response = await peer1.readline()
peer2.write(b'<response>')
peer2.write(response)
peer2.write(b'</response>')
await peer2.drain()
drain()
returns 如果待处理的未写入数据量很小,则立即。如果数据超过高阈值,drain()
将暂停调用协程,直到待处理的未写入数据量降至低阈值以下。暂停将导致协程停止从 peer1
读取数据,这反过来又会导致对等方减慢它向我们发送数据的速率。这种反馈称为背压。
Buffering should be handled inside write function and application should not care.
这几乎就是 write()
现在的工作方式 - 它确实处理缓冲并且让应用程序不关心,无论好坏。另请参阅 了解更多信息。
解决问题的编辑部分:
Reading the answer and links again, I think the the functions work like this.
write()
还是比那个聪明一点。它不会尝试只写一次,它实际上会安排数据继续写入,直到没有数据可写为止。即使您从不 await drain()
也会发生这种情况 - 应用程序唯一必须做的就是让事件循环 运行 它的过程足够长以写出所有内容。
更正确的 write
和 drain
伪代码可能如下所示:
class ToyWriter:
def __init__(self):
self._buf = bytearray()
self._empty = asyncio.Event(True)
def write(self, data):
self._buf.extend(data)
loop.add_writer(self._fd, self._do_write)
self._empty.clear()
def _do_write(self):
# Automatically invoked by the event loop when the
# file descriptor is writable, regardless of whether
# anyone calls drain()
while self._buf:
try:
nwritten = os.write(self._fd, self._buf)
except OSError as e:
if e.errno == errno.EWOULDBLOCK:
return # continue once we're writable again
raise
self._buf = self._buf[nwritten:]
self._empty.set()
loop.remove_writer(self._fd, self._do_write)
async def drain(self):
if len(self._buf) > 64*1024:
await self._empty.wait()
实际实现比较复杂,因为:
- 它写在Twisted-style transport/protocol layer with its own sophisticated flow control之上,而不是在
os.write
之上;
drain()
并不是真正等到缓冲区为空,而是等到 low watermark;
_do_write
中引发的 EWOULDBLOCK
以外的异常被存储并在 drain()
中重新引发。
最后一点是 另一个 调用 drain()
的充分理由 - 实际注意到对等点已消失,因为写入失败。
来自 doc:
write(data)
Write data to the stream. This method is not subject to flow control. Calls to write() should be followed by drain().
coroutine drain()
Wait until it is appropriate to resume writing to the stream. Example: writer.write(data) await writer.drain()
据我了解,
- 每次调用
write
都需要调用drain
。 - 如果不是我猜,
write
会阻塞循环线程
那write为什么不是自动调用的协程呢?为什么会一呼write
而不必漏?我可以想到两种情况
- 您想
write
和close
立即 - 您必须在消息完成之前缓冲一些数据。
第一个是特例,我想我们可以有不同的API。缓冲应该在写函数内部处理,应用程序不应该关心。
让我换个方式提出问题。这样做的缺点是什么? python3.8 版本是否有效地做到了这一点?
async def awrite(writer, data):
writer.write(data)
await writer.drain()
注意:drain
文档明确指出以下内容:
When there is nothing to wait for, the
drain()
returns immediately.
再次阅读答案和链接,我认为函数是这样工作的。 注意:检查已接受的答案以获得更准确的版本。
def write(data):
remaining = socket.try_write(data)
if remaining:
_pendingbuffer.append(remaining) # Buffer will keep growing if other side is slow and we have a lot of data
async def drain():
if len(_pendingbuffer) < BUF_LIMIT:
return
await wait_until_other_side_is_up_to_speed()
assert len(_pendingbuffer) < BUF_LIMIT
async def awrite(writer, data):
writer.write(data)
await writer.drain()
那么什么时候用什么:
- 当数据不连续时,比如响应HTTP请求。我们只需要发送一些数据,而不关心何时到达,内存也不是问题 - 只需使用
write
- 同上但内存是个问题,使用
awrite
- 当向大量客户端流式传输数据时(例如一些实时流或一个巨大的文件)。如果数据在每个连接的缓冲区中都是重复的,它肯定会溢出 RAM。在这种情况下,编写一个循环,每次迭代获取一大块数据并调用
awrite
。如果文件很大,loop.sendfile
如果可用则更好。
From what I understand, (1) You need to call drain every time write is called. (2) If not I guess, write will block the loop thread
两者都不正确,但混淆是可以理解的。 write()
的工作方式如下:
调用
write()
只是将数据存储到缓冲区,将其留给事件循环以在稍后实际写出,而无需程序进一步干预。就应用程序而言,数据在后台写入的速度与另一方接收数据的速度一样快。换句话说,每个write()
将安排其数据使用尽可能多的 OS 级写入来传输,这些写入在相应的文件描述符实际可写时发出。所有这一切都会自动发生,甚至无需等待drain()
.write()
不是协程,它绝对从不 阻塞事件循环。
第二个 属性 听起来很方便 - 您可以在任何需要的地方调用 write()
,即使不是 async def
的函数 - 但它实际上是一个主要的 write()
的缺陷。流 API 公开的写入与接受数据的 OS 完全分离,因此如果您写入数据的速度快于网络对等体读取数据的速度,内部缓冲区将不断增长,您将拥有memory leak 在你手上。 drain()
修复了这个问题:如果写入缓冲区变得太大,等待它暂停协程,并在 os.write()
在后台成功执行并且缓冲区缩小后再次恢复它。
您不需要在 每次 写入之后等待 drain()
,但您确实需要偶尔等待它,通常是在循环的迭代之间 write()
被调用。例如:
while True:
response = await peer1.readline()
peer2.write(b'<response>')
peer2.write(response)
peer2.write(b'</response>')
await peer2.drain()
drain()
returns 如果待处理的未写入数据量很小,则立即。如果数据超过高阈值,drain()
将暂停调用协程,直到待处理的未写入数据量降至低阈值以下。暂停将导致协程停止从 peer1
读取数据,这反过来又会导致对等方减慢它向我们发送数据的速率。这种反馈称为背压。
Buffering should be handled inside write function and application should not care.
这几乎就是 write()
现在的工作方式 - 它确实处理缓冲并且让应用程序不关心,无论好坏。另请参阅
解决问题的编辑部分:
Reading the answer and links again, I think the the functions work like this.
write()
还是比那个聪明一点。它不会尝试只写一次,它实际上会安排数据继续写入,直到没有数据可写为止。即使您从不 await drain()
也会发生这种情况 - 应用程序唯一必须做的就是让事件循环 运行 它的过程足够长以写出所有内容。
更正确的 write
和 drain
伪代码可能如下所示:
class ToyWriter:
def __init__(self):
self._buf = bytearray()
self._empty = asyncio.Event(True)
def write(self, data):
self._buf.extend(data)
loop.add_writer(self._fd, self._do_write)
self._empty.clear()
def _do_write(self):
# Automatically invoked by the event loop when the
# file descriptor is writable, regardless of whether
# anyone calls drain()
while self._buf:
try:
nwritten = os.write(self._fd, self._buf)
except OSError as e:
if e.errno == errno.EWOULDBLOCK:
return # continue once we're writable again
raise
self._buf = self._buf[nwritten:]
self._empty.set()
loop.remove_writer(self._fd, self._do_write)
async def drain(self):
if len(self._buf) > 64*1024:
await self._empty.wait()
实际实现比较复杂,因为:
- 它写在Twisted-style transport/protocol layer with its own sophisticated flow control之上,而不是在
os.write
之上; drain()
并不是真正等到缓冲区为空,而是等到 low watermark;_do_write
中引发的EWOULDBLOCK
以外的异常被存储并在drain()
中重新引发。
最后一点是 另一个 调用 drain()
的充分理由 - 实际注意到对等点已消失,因为写入失败。