如何在异步服务器中实现超时?

How to implement timeout in asyncio server?

下面是一个简单的回显服务器。但是如果客户端在 10 秒内没有发送任何东西,我想关闭连接。

import asyncio


async def process(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
    print("awaiting for data")
    line = await reader.readline()
    print(f"received {line}")
    writer.write(line)
    print(f"sent {line}")
    await writer.drain()
    print(f"Drained")


async def timeout(task: asyncio.Task, duration):
    print("timeout started")
    await asyncio.sleep(duration)
    print("client unresponsive, cancelling")
    task.cancel()
    print("task cancelled")


async def new_session(reader, writer):
    print("new session started")
    task = asyncio.create_task(process(reader, writer))
    timer = asyncio.create_task(timeout(task, 10))
    await task
    print("task complete")
    timer.cancel()
    print("timer cancelled")
    writer.close()
    print("writer closed")


async def a_main():
    server = await asyncio.start_server(new_session, port=8088)
    await server.serve_forever()


if __name__ == '__main__':
    asyncio.run(a_main())

如果客户端发送消息,它工作正常。但另一种情况,当客户端静默时,它不起作用

客户端发送消息时:

new session started
awaiting for data
timeout started
received b'slkdfjsdlkfj\r\n'
sent b'slkdfjsdlkfj\r\n'
Drained
task complete
timer cancelled
writer closed

客户端打开连接后静默时

new session started
awaiting for data
timeout started
client unresponsive, cancelling
task cancelled

没有task complete,timer cancelled,writer closed.

  1. 上面的代码有什么问题?
  2. 是否有更好的方法来实现超时?

更新

解决了问题,看起来任务实际上被取消了,但异常被默默地忽略了,通过捕获 CancelledError

解决了问题
async def new_session(reader, writer):
    print("new session started")
    task = asyncio.create_task(process(reader, writer))
    timer = asyncio.create_task(timeout(task, 10))
    try:
        await task
    except asyncio.CancelledError:
        print(f"Task took too long and was cancelled by timer")
    print("task complete")
    timer.cancel()
    print("timer cancelled")
    writer.close()
    print("writer closed")

第二部分还在。有没有更好的方法来实现超时?


更新2

使用 wait_for 完成代码。不再需要超时代码。检查已接受 下面:

async def new_session(reader, writer):
    print("new session started")
    try:
        await asyncio.wait_for(process(reader, writer), timeout=5)
    except asyncio.TimeoutError as te:
        print(f'time is up!{te}')
    finally:
        writer.close()
        print("writer closed")

我在建立连接时使用了以下代码。我建议对您的代码类似地使用 wait_for。

fut = asyncio.open_connection( self.host, self.port, loop=self.loop )
try:
   r, w = await asyncio.wait_for(fut, timeout=self.connection_timeout)
except asyncio.TimeoutError:
   pass

Is there a better way to implement timeouts?

您可以使用 asyncio.wait_for 而不是 timeout。它具有相似的语义,但已经带有 asyncio。另外,你可以等待它的未来 returns 来检测是否发生超时。