从异步函数开始无限循环
Starting an endless loop from an async function
我正在尝试定期查询 API。我的代码结构如下:
async def QueryData():
print(datetime.datetime.now())
async def main():
await TestApiConnection()
scheduler = AsyncIOScheduler(timezone="Europe/Berlin")
scheduler.add_job(QueryData, 'cron', minute='0-59')
scheduler.start()
try:
asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, SystemExit):
pass
scheduler.shutdown(wait=False)
if __name__ == "__main__":
asyncio.run(main())
如果我运行这样会抛出以下错误:
in run_forever
assert self._self_reading_future is None AssertionError
如果我将主函数转换为同步函数,QueryData 作业将启动,结果是不再等待 TestApiConnection .
async def QueryData():
print(datetime.datetime.now())
def main():
TestApiConnection()
...
if __name__ == "__main__":
main()
那么如何从异步主方法启动作业呢?或者我应该重构代码?
您不能从事件循环的内部调用run_forever()
,该函数用于需要启动事件循环的同步代码。当你使用 asyncio.run()
时,你可以简单地 await 一些永远不会结束的东西。例如,您可以将 asyncio.get_event_loop().run_forever()
替换为:
await asyncio.Event().wait()
另请注意,在调用 scheduler.shutdown()
之前不需要 pass
。 pass
关键字的唯一目的是在语法需要声明的地方充当惰性替代品,而您无需提供任何内容。
我正在尝试定期查询 API。我的代码结构如下:
async def QueryData():
print(datetime.datetime.now())
async def main():
await TestApiConnection()
scheduler = AsyncIOScheduler(timezone="Europe/Berlin")
scheduler.add_job(QueryData, 'cron', minute='0-59')
scheduler.start()
try:
asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, SystemExit):
pass
scheduler.shutdown(wait=False)
if __name__ == "__main__":
asyncio.run(main())
如果我运行这样会抛出以下错误:
in run_forever assert self._self_reading_future is None AssertionError
如果我将主函数转换为同步函数,QueryData 作业将启动,结果是不再等待 TestApiConnection .
async def QueryData():
print(datetime.datetime.now())
def main():
TestApiConnection()
...
if __name__ == "__main__":
main()
那么如何从异步主方法启动作业呢?或者我应该重构代码?
您不能从事件循环的内部调用run_forever()
,该函数用于需要启动事件循环的同步代码。当你使用 asyncio.run()
时,你可以简单地 await 一些永远不会结束的东西。例如,您可以将 asyncio.get_event_loop().run_forever()
替换为:
await asyncio.Event().wait()
另请注意,在调用 scheduler.shutdown()
之前不需要 pass
。 pass
关键字的唯一目的是在语法需要声明的地方充当惰性替代品,而您无需提供任何内容。