如何使用 AsyncIOScheduler 到另一个函数的 运行 函数?
How to use AsyncIOScheduler to run function from another function?
我很难理解 AsyncIOScheduler
的工作原理以及如何在主函数中安排一个函数,或者如何才是正确的方法。
如何每 10 秒 运行 函数 foo
?
假设我有这个结构:
package/
controllers.py
main.py
从文件 controllers.py
中我得到了一个名为 foo
的函数,该函数是这样的:
async def foo():
print('Hello World')
我想 运行 文件 main.py
中的函数 foo
(以及许多其他函数),这个文件如下:
import asyncio
from controllers import foo, bar
from apscheduler.schedulers.asyncio import AsyncIOScheduler
async def main():
# Init message
print('\nPress Ctrl-C to quit at anytime!\n' )
await asyncio.create_task(bar())
scheduler = AsyncIOScheduler()
scheduler.add_job(await asyncio.create_task(foo()), "interval", seconds=10)
scheduler.start()
if __name__ == "__main__":
while True:
try:
asyncio.run(main())
except Exception as e:
print("Exception: " + str(e))
调度器这样运行对吗?还是每次主函数运行ning?
都会重置定时器
我尝试了下面的代码,循环间隔有效,但 main
函数无效。
import asyncio
from controllers import foo, bar
from apscheduler.schedulers.asyncio import AsyncIOScheduler
async def main():
# Init message
print('\nPress Ctrl-C to quit at anytime!\n' )
await asyncio.create_task(bar())
if __name__ == "__main__":
scheduler = AsyncIOScheduler()
scheduler.add_job(foo, "interval", seconds=10)
scheduler.start()
while True:
try:
asyncio.get_event_loop().run_forever()
asyncio.run(main())
except Exception as e:
print("Exception: " + str(e))
如果我更改了顺序:
asyncio.get_event_loop().run_forever()
asyncio.run(main())
至:
asyncio.run(main())
asyncio.get_event_loop().run_forever()
我收到错误:There is no current event loop in thread 'MainThread'.
循环有效(仅限调度程序),但 main
函数不是 运行ning,我如何将它们放在循环中?
我想你想 运行 你 main()
永远发挥作用,所以你可以这样试试。
controllers.py
from datetime import datetime
async def foo():
print(f'{datetime.now()} Foo')
async def bar():
print(f'{datetime.now()} Bar')
main.py
import asyncio
from controllers import foo, bar
from apscheduler.schedulers.asyncio import AsyncIOScheduler
async def main():
# Init message
print('\nPress Ctrl-C to quit at anytime!\n')
scheduler = AsyncIOScheduler()
scheduler.add_job(foo, "interval", seconds=2)
scheduler.start()
while True:
await asyncio.create_task(bar())
await asyncio.sleep(1)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()
输出:
Press Ctrl-C to quit at anytime!
2022-01-09 03:38:10.468914 Bar
2022-01-09 03:38:11.472966 Bar
2022-01-09 03:38:12.468882 Foo
2022-01-09 03:38:12.473449 Bar
2022-01-09 03:38:13.477678 Bar
2022-01-09 03:38:14.469389 Foo
2022-01-09 03:38:14.480098 Bar
2022-01-09 03:38:15.483889 Bar
2022-01-09 03:38:16.472584 Foo
2022-01-09 03:38:16.487105 Bar
2022-01-09 03:38:17.492550 Bar
2022-01-09 03:38:18.467403 Foo
2022-01-09 03:38:18.493330 Bar
2022-01-09 03:38:19.496680 Bar
2022-01-09 03:38:20.471553 Foo
2022-01-09 03:38:20.499928 Bar
2022-01-09 03:38:21.503572 Bar
2022-01-09 03:38:22.470329 Foo
2022-01-09 03:38:22.505597 Bar
2022-01-09 03:38:23.510854 Bar
2022-01-09 03:38:24.471455 Foo
...
此外,您可以 运行 周期函数而不使用 AsyncIOScheduler
,例如 。
我很难理解 AsyncIOScheduler
的工作原理以及如何在主函数中安排一个函数,或者如何才是正确的方法。
如何每 10 秒 运行 函数 foo
?
假设我有这个结构:
package/
controllers.py
main.py
从文件 controllers.py
中我得到了一个名为 foo
的函数,该函数是这样的:
async def foo():
print('Hello World')
我想 运行 文件 main.py
中的函数 foo
(以及许多其他函数),这个文件如下:
import asyncio
from controllers import foo, bar
from apscheduler.schedulers.asyncio import AsyncIOScheduler
async def main():
# Init message
print('\nPress Ctrl-C to quit at anytime!\n' )
await asyncio.create_task(bar())
scheduler = AsyncIOScheduler()
scheduler.add_job(await asyncio.create_task(foo()), "interval", seconds=10)
scheduler.start()
if __name__ == "__main__":
while True:
try:
asyncio.run(main())
except Exception as e:
print("Exception: " + str(e))
调度器这样运行对吗?还是每次主函数运行ning?
都会重置定时器我尝试了下面的代码,循环间隔有效,但 main
函数无效。
import asyncio
from controllers import foo, bar
from apscheduler.schedulers.asyncio import AsyncIOScheduler
async def main():
# Init message
print('\nPress Ctrl-C to quit at anytime!\n' )
await asyncio.create_task(bar())
if __name__ == "__main__":
scheduler = AsyncIOScheduler()
scheduler.add_job(foo, "interval", seconds=10)
scheduler.start()
while True:
try:
asyncio.get_event_loop().run_forever()
asyncio.run(main())
except Exception as e:
print("Exception: " + str(e))
如果我更改了顺序:
asyncio.get_event_loop().run_forever()
asyncio.run(main())
至:
asyncio.run(main())
asyncio.get_event_loop().run_forever()
我收到错误:There is no current event loop in thread 'MainThread'.
循环有效(仅限调度程序),但 main
函数不是 运行ning,我如何将它们放在循环中?
我想你想 运行 你 main()
永远发挥作用,所以你可以这样试试。
controllers.py
from datetime import datetime
async def foo():
print(f'{datetime.now()} Foo')
async def bar():
print(f'{datetime.now()} Bar')
main.py
import asyncio
from controllers import foo, bar
from apscheduler.schedulers.asyncio import AsyncIOScheduler
async def main():
# Init message
print('\nPress Ctrl-C to quit at anytime!\n')
scheduler = AsyncIOScheduler()
scheduler.add_job(foo, "interval", seconds=2)
scheduler.start()
while True:
await asyncio.create_task(bar())
await asyncio.sleep(1)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()
输出:
Press Ctrl-C to quit at anytime!
2022-01-09 03:38:10.468914 Bar
2022-01-09 03:38:11.472966 Bar
2022-01-09 03:38:12.468882 Foo
2022-01-09 03:38:12.473449 Bar
2022-01-09 03:38:13.477678 Bar
2022-01-09 03:38:14.469389 Foo
2022-01-09 03:38:14.480098 Bar
2022-01-09 03:38:15.483889 Bar
2022-01-09 03:38:16.472584 Foo
2022-01-09 03:38:16.487105 Bar
2022-01-09 03:38:17.492550 Bar
2022-01-09 03:38:18.467403 Foo
2022-01-09 03:38:18.493330 Bar
2022-01-09 03:38:19.496680 Bar
2022-01-09 03:38:20.471553 Foo
2022-01-09 03:38:20.499928 Bar
2022-01-09 03:38:21.503572 Bar
2022-01-09 03:38:22.470329 Foo
2022-01-09 03:38:22.505597 Bar
2022-01-09 03:38:23.510854 Bar
2022-01-09 03:38:24.471455 Foo
...
此外,您可以 运行 周期函数而不使用 AsyncIOScheduler
,例如