监控异步事件循环

Monitoring the asyncio event loop

我正在使用 python3 编写应用程序,这是我第一次尝试使用 asyncio。我遇到的一个问题是我的一些协同程序阻塞事件循环的时间比我希望的要长。我试图在事件循环的顶部找到一些东西,它会显示 wall/cpu 我的每个协同程序花费了多少 运行 时间。如果没有任何东西已经存在,有没有人知道一种方法可以将挂钩添加到事件循环以便我可以进行测量?

我尝试使用 cProfile,它提供了一些有用的输出,但我更感兴趣的是阻塞事件循环所花费的时间,而不是总执行时间。

事件循环已经可以跟踪协程是否需要很多 CPU 时间来执行。要查看它,您应该 enable debug mode 使用 set_debug 方法:

import asyncio
import time


async def main():
    time.sleep(1)  # Block event loop


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.set_debug(True)  # Enable debug
    loop.run_until_complete(main())

在输出中你会看到:

Executing <Task finished coro=<main() [...]> took 1.016 seconds

默认情况下,它会针对阻塞时间超过 0.1 秒的协程显示警告。它没有记录,但基于 asyncio source code,看起来你可以更改 slow_callback_duration 属性来修改此值。

您可以使用 call_later。定期 运行 回调将 log/notify 循环时间和周期间隔时间的差异。

class EventLoopDelayMonitor:

    def __init__(self, loop=None, start=True, interval=1, logger=None):
        self._interval = interval
        self._log = logger or logging.getLogger(__name__)
        self._loop = loop or asyncio.get_event_loop()
        if start:
            self.start()

    def run(self):
        self._loop.call_later(self._interval, self._handler, self._loop.time())

    def _handler(self, start_time):
        latency = (self._loop.time() - start_time) - self._interval
        self._log.error('EventLoop delay %.4f', latency)
        if not self.is_stopped():
            self.run()

    def is_stopped(self):
        return self._stopped

    def start(self):
        self._stopped = False
        self.run()

    def stop(self):
        self._stopped = True

例子

import time

async def main():
    EventLoopDelayMonitor(interval=1)
    await asyncio.sleep(1)
    time.sleep(2)
    await asyncio.sleep(1)
    await asyncio.sleep(1)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

输出

EventLoop delay 0.0013
EventLoop delay 1.0026
EventLoop delay 0.0014
EventLoop delay 0.0015

为了扩展其中一个答案,如果您想监视循环 检测挂起,这里有一个片段可以做到这一点。它启动一个单独的线程来检查循环的任务最近是否足够执行。

def monitor_loop(loop, delay_handler):
loop = loop
last_call = loop.time()

INTERVAL = .5  # How often to poll the loop and check the current delay.
def run_last_call_updater():
    loop.call_later(INTERVAL, last_call_updater)
def last_call_updater():
    nonlocal last_call
    last_call = loop.time()
    run_last_call_updater()
run_last_call_updater()

def last_call_checker():
    threading.Timer(INTERVAL / 2, last_call_checker).start()
    if loop.time() - last_call > INTERVAL:
        delay_handler(loop.time() - last_call)
threading.Thread(target=last_call_checker).start()

对于 2019 年阅读本文的任何人来说,这可能是一个更好的答案:yappi。使用 Yappi 版本 1.2.1>=,您可以在本地分析协程并准确查看在协程中花费了多少墙或 cpu 时间。

有关此协程分析的详细信息,请参阅here