Python 异步强制超时
Python asyncio force timeout
使用 asyncio 协程可以在超时后执行,因此它会在超时后被取消:
@asyncio.coroutine
def coro():
yield from asyncio.sleep(10)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(coro(), 5))
以上示例按预期工作(5 秒后超时)。
然而,当协程不使用 asyncio.sleep()
(或其他异步协程)时,它似乎不会超时。示例:
@asyncio.coroutine
def coro():
import time
time.sleep(10)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(coro(), 1))
运行 需要 10 多秒,因为 time.sleep(10)
没有被取消。在这种情况下是否可以强制取消协程?
如果要用asyncio来解决,我该怎么做?
不,您不能中断协程,除非它把控制权交还给事件循环,这意味着它需要在 yield from
调用中。 asyncio
是单线程的,因此当您在第二个示例中阻塞 time.sleep(10)
调用时,事件循环无法到达 运行。这意味着当您使用 wait_for
设置的超时到期时,事件循环将无法对其采取行动。事件循环没有机会再次 运行 直到 coro
退出,此时为时已晚。
这就是为什么一般来说,您应该始终避免任何非异步的阻塞调用;任何时候一个调用阻塞而不屈服于事件循环,你的程序中的任何其他东西都不能执行,这可能不是你想要的。如果你真的需要做一个长时间的阻塞操作,你应该尝试在线程或进程池中使用 BaseEventLoop.run_in_executor
到 运行 它,这将避免阻塞事件循环:
import asyncio
import time
from concurrent.futures import ProcessPoolExecutor
@asyncio.coroutine
def coro(loop):
ex = ProcessPoolExecutor(2)
yield from loop.run_in_executor(ex, time.sleep, 10) # This can be interrupted.
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(coro(loop), 1))
谢谢@dano 的回答。如果 运行 a coroutine
不是硬性要求,这里是一个重新设计的更紧凑的版本
import asyncio, time
timeout = 0.5
loop = asyncio.get_event_loop()
future = asyncio.wait_for(loop.run_in_executor(None, time.sleep, 2), timeout)
try:
loop.run_until_complete(future)
print('Thx for letting me sleep')
except asyncio.exceptions.TimeoutError:
print('I need more sleep !')
出于好奇,在我的 Python 3.8.2
中进行了一点调试,表明将 None
作为执行程序传递会导致创建 _default_executor
,如下所示:
self._default_executor = concurrent.futures.ThreadPoolExecutor()
我看到的超时处理示例非常简单。鉴于现实,我的应用程序有点复杂。顺序是:
- 当客户端连接到服务器时,让服务器创建另一个到内部服务器的连接
- 当内部服务器连接正常后,等待客户端发送数据。根据这些数据,我们可能会对内部服务器进行查询。
- 当有数据要发送到内部服务器时,发送它。由于内部服务器有时响应速度不够快,因此将此请求包装到超时中。
- 如果操作超时,则关闭所有连接以向客户端发出有关错误的信号
为了实现以上所有,同时保持事件循环运行,生成的代码包含以下代码:
def connection_made(self, transport):
self.client_lock_coro = self.client_lock.acquire()
asyncio.ensure_future(self.client_lock_coro).add_done_callback(self._got_client_lock)
def _got_client_lock(self, task):
task.result() # True at this point, but call there will trigger any exceptions
coro = self.loop.create_connection(lambda: ClientProtocol(self),
self.connect_info[0], self.connect_info[1])
asyncio.ensure_future(asyncio.wait_for(coro,
self.client_connect_timeout
)).add_done_callback(self.connected_server)
def connected_server(self, task):
transport, client_object = task.result()
self.client_transport = transport
self.client_lock.release()
def data_received(self, data_in):
asyncio.ensure_future(self.send_to_real_server(message, self.client_send_timeout))
def send_to_real_server(self, message, timeout=5.0):
yield from self.client_lock.acquire()
asyncio.ensure_future(asyncio.wait_for(self._send_to_real_server(message),
timeout, loop=self.loop)
).add_done_callback(self.sent_to_real_server)
@asyncio.coroutine
def _send_to_real_server(self, message):
self.client_transport.write(message)
def sent_to_real_server(self, task):
task.result()
self.client_lock.release()
使用 asyncio 协程可以在超时后执行,因此它会在超时后被取消:
@asyncio.coroutine
def coro():
yield from asyncio.sleep(10)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(coro(), 5))
以上示例按预期工作(5 秒后超时)。
然而,当协程不使用 asyncio.sleep()
(或其他异步协程)时,它似乎不会超时。示例:
@asyncio.coroutine
def coro():
import time
time.sleep(10)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(coro(), 1))
运行 需要 10 多秒,因为 time.sleep(10)
没有被取消。在这种情况下是否可以强制取消协程?
如果要用asyncio来解决,我该怎么做?
不,您不能中断协程,除非它把控制权交还给事件循环,这意味着它需要在 yield from
调用中。 asyncio
是单线程的,因此当您在第二个示例中阻塞 time.sleep(10)
调用时,事件循环无法到达 运行。这意味着当您使用 wait_for
设置的超时到期时,事件循环将无法对其采取行动。事件循环没有机会再次 运行 直到 coro
退出,此时为时已晚。
这就是为什么一般来说,您应该始终避免任何非异步的阻塞调用;任何时候一个调用阻塞而不屈服于事件循环,你的程序中的任何其他东西都不能执行,这可能不是你想要的。如果你真的需要做一个长时间的阻塞操作,你应该尝试在线程或进程池中使用 BaseEventLoop.run_in_executor
到 运行 它,这将避免阻塞事件循环:
import asyncio
import time
from concurrent.futures import ProcessPoolExecutor
@asyncio.coroutine
def coro(loop):
ex = ProcessPoolExecutor(2)
yield from loop.run_in_executor(ex, time.sleep, 10) # This can be interrupted.
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(coro(loop), 1))
谢谢@dano 的回答。如果 运行 a coroutine
不是硬性要求,这里是一个重新设计的更紧凑的版本
import asyncio, time
timeout = 0.5
loop = asyncio.get_event_loop()
future = asyncio.wait_for(loop.run_in_executor(None, time.sleep, 2), timeout)
try:
loop.run_until_complete(future)
print('Thx for letting me sleep')
except asyncio.exceptions.TimeoutError:
print('I need more sleep !')
出于好奇,在我的 Python 3.8.2
中进行了一点调试,表明将 None
作为执行程序传递会导致创建 _default_executor
,如下所示:
self._default_executor = concurrent.futures.ThreadPoolExecutor()
我看到的超时处理示例非常简单。鉴于现实,我的应用程序有点复杂。顺序是:
- 当客户端连接到服务器时,让服务器创建另一个到内部服务器的连接
- 当内部服务器连接正常后,等待客户端发送数据。根据这些数据,我们可能会对内部服务器进行查询。
- 当有数据要发送到内部服务器时,发送它。由于内部服务器有时响应速度不够快,因此将此请求包装到超时中。
- 如果操作超时,则关闭所有连接以向客户端发出有关错误的信号
为了实现以上所有,同时保持事件循环运行,生成的代码包含以下代码:
def connection_made(self, transport):
self.client_lock_coro = self.client_lock.acquire()
asyncio.ensure_future(self.client_lock_coro).add_done_callback(self._got_client_lock)
def _got_client_lock(self, task):
task.result() # True at this point, but call there will trigger any exceptions
coro = self.loop.create_connection(lambda: ClientProtocol(self),
self.connect_info[0], self.connect_info[1])
asyncio.ensure_future(asyncio.wait_for(coro,
self.client_connect_timeout
)).add_done_callback(self.connected_server)
def connected_server(self, task):
transport, client_object = task.result()
self.client_transport = transport
self.client_lock.release()
def data_received(self, data_in):
asyncio.ensure_future(self.send_to_real_server(message, self.client_send_timeout))
def send_to_real_server(self, message, timeout=5.0):
yield from self.client_lock.acquire()
asyncio.ensure_future(asyncio.wait_for(self._send_to_real_server(message),
timeout, loop=self.loop)
).add_done_callback(self.sent_to_real_server)
@asyncio.coroutine
def _send_to_real_server(self, message):
self.client_transport.write(message)
def sent_to_real_server(self, task):
task.result()
self.client_lock.release()