如何检测异步中的写入失败?

How to detect write failure in asyncio?

作为一个简单的例子,请考虑下面 /dev/zero 的网络等价物。 (或者更现实地说,只是一个网络服务器发送一个大文件。)

如果客户端提前断开连接,您会收到一连串的日志消息:

WARNING:asyncio:socket.send() raised exception.

但我没有找到任何方法来捕获上述异常。假设的服务器继续从磁盘读取千兆字节并将它们发送到死套接字,而客户端没有任何努力,并且您自己遭到了 DoS 攻击。

我从文档中发现的唯一一件事是从读取中产生,空字符串表示关闭。但这在这里并不好,因为普通客户端不会发送任何东西,从而阻塞写循环。

使用流 API 或其他方式检测写入失败或收到 TCP 连接已关闭通知的正确方法是什么?

代码:

from asyncio import *
import logging

@coroutine
def client_handler(reader, writer):
    while True:
        writer.write(bytes(1))
        yield from writer.drain()

logging.basicConfig(level=logging.INFO)
loop = get_event_loop()
coro = start_server(client_handler, '', 12345)
server = loop.run_until_complete(coro)
loop.run_forever()

基于流的 API 没有您可以在连接关闭时指定的回调。但协议 API 确实如此,因此请改用它:https://docs.python.org/3/library/asyncio-protocol.html#connection-callbacks

这有点奇怪,但实际上您可以允许异常到达 client_handler 协程,方法是强制它在一次迭代中将控制权交给事件循环:

import asyncio
import logging

@asyncio.coroutine
def client_handler(reader, writer):
    while True:
        writer.write(bytes(1))
        yield  # Yield to the event loop
        yield from writer.drain()

logging.basicConfig(level=logging.INFO)
loop = asyncio.get_event_loop()
coro = asyncio.start_server(client_handler, '', 12345)
server = loop.run_until_complete(coro)
loop.run_forever()

如果我这样做,我会在终止客户端连接时得到以下输出:

ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<client_handler() done, defined at aio.py:4> exception=ConnectionResetError(104, 'Connection reset by peer')>
Traceback (most recent call last):
  File "/usr/lib/python3.4/asyncio/tasks.py", line 238, in _step
    result = next(coro)
  File "aio.py", line 9, in client_handler
    yield from writer.drain()
  File "/usr/lib/python3.4/asyncio/streams.py", line 301, in drain
    raise exc
  File "/usr/lib/python3.4/asyncio/selector_events.py", line 700, in write
    n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer

我真的不太清楚为什么您需要显式地让事件循环控制异常通过 - 目前没有时间深入研究它。我假设需要翻转一些位以指示连接断开,并且在循环中调用 yield from writer.drain()(它可以通过事件循环短路)可以防止这种情况发生,但我真的不确定。如果我有机会进行调查,我会用该信息更新答案。

我深入研究了 asyncio 源代码,以扩展 关于为什么在未明确将控制权传递给事件循环的情况下不引发异常的答案。这是我的发现。

调用 yield from wirter.drain() 将控制权交给 StreamWriter.drain coroutine. This coroutine checks for and raises any exceptions that that the StreamReaderProtocol set on the StreamReader. But since we passed control over to drain, the protocol hasn't had the chance to set the exception yet. drain then gives control over to the FlowControlMixin._drain_helper 协程。这个协程立即 returns 因为还没有设置更多的标志,并且控件最终返回调用 yield from wirter.drain().

的协程

所以我们绕了一整圈,没有将控制权交给事件循环以允许它处理其他协程并将异常冒泡到 writer.drain()

drain() 之前

yielding 使 transport/protocol 有机会设置适当的标志和例外。

这是正在发生的事情的模型,所有嵌套调用都已折叠:

import asyncio as aio

def set_exception(ctx, exc):
  ctx["exc"] = exc

@aio.coroutine
def drain(ctx):
  if ctx["exc"] is not None:
    raise ctx["exc"]

  return

@aio.coroutine
def client_handler(ctx):
  i = 0
  while True:
    i += 1
    print("write", i)
    # yield # Uncommenting this allows the loop.call_later call to be scheduled.
    yield from drain(ctx)

CTX = {"exc": None}

loop = aio.get_event_loop()
# Set the exception in 5 seconds
loop.call_later(5, set_exception, CTX, Exception("connection lost"))
loop.run_until_complete(client_handler(CTX))
loop.close()

这可能应该由 asyncio 开发人员在 Streams API 的上游修复。