如何使用 AsyncIOScheduler 到另一个函数的 运行 函数?

How to use AsyncIOScheduler to run function from another function?

我很难理解 AsyncIOScheduler 的工作原理以及如何在主函数中安排一个函数,或者如何才是正确的方法。

如何每 10 秒 运行 函数 foo

假设我有这个结构:

package/
    controllers.py
    main.py

从文件 controllers.py 中我得到了一个名为 foo 的函数,该函数是这样的:

async def foo():
    print('Hello World')

我想 运行 文件 main.py 中的函数 foo(以及许多其他函数),这个文件如下:

import asyncio
from controllers import foo, bar
from apscheduler.schedulers.asyncio import AsyncIOScheduler

async def main():

    # Init message
    print('\nPress Ctrl-C to quit at anytime!\n' )
    
    await asyncio.create_task(bar())

    scheduler = AsyncIOScheduler()
    scheduler.add_job(await asyncio.create_task(foo()), "interval", seconds=10)
    scheduler.start()

if __name__ == "__main__":
    while True:
        try:
            asyncio.run(main())
        except Exception as e:
            print("Exception: " + str(e))

调度器这样运行对吗?还是每次主函数运行ning?

都会重置定时器

我尝试了下面的代码,循环间隔有效,但 main 函数无效。

import asyncio
from controllers import foo, bar
from apscheduler.schedulers.asyncio import AsyncIOScheduler

async def main():

    # Init message
    print('\nPress Ctrl-C to quit at anytime!\n' )
    
    await asyncio.create_task(bar())

if __name__ == "__main__":

    scheduler = AsyncIOScheduler()
    scheduler.add_job(foo, "interval", seconds=10)
    scheduler.start()

    while True:
        try:
            asyncio.get_event_loop().run_forever()
            asyncio.run(main())
        except Exception as e:
            print("Exception: " + str(e))

如果我更改了顺序:

            asyncio.get_event_loop().run_forever()
            asyncio.run(main())

至:

            asyncio.run(main())
            asyncio.get_event_loop().run_forever()

我收到错误:There is no current event loop in thread 'MainThread'.

循环有效(仅限调度程序),但 main 函数不是 运行ning,我如何将它们放在循环中?

我想你想 运行 你 main() 永远发挥作用,所以你可以这样试试。

controllers.py

from datetime import datetime


async def foo():
    print(f'{datetime.now()} Foo')


async def bar():
    print(f'{datetime.now()} Bar')

main.py

import asyncio
from controllers import foo, bar
from apscheduler.schedulers.asyncio import AsyncIOScheduler


async def main():
    # Init message
    print('\nPress Ctrl-C to quit at anytime!\n')

    scheduler = AsyncIOScheduler()
    scheduler.add_job(foo, "interval", seconds=2)
    scheduler.start()
    
    while True:
        await asyncio.create_task(bar())
        await asyncio.sleep(1)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.create_task(main())
    loop.run_forever()

输出:

Press Ctrl-C to quit at anytime!
2022-01-09 03:38:10.468914 Bar
2022-01-09 03:38:11.472966 Bar
2022-01-09 03:38:12.468882 Foo
2022-01-09 03:38:12.473449 Bar
2022-01-09 03:38:13.477678 Bar
2022-01-09 03:38:14.469389 Foo
2022-01-09 03:38:14.480098 Bar
2022-01-09 03:38:15.483889 Bar
2022-01-09 03:38:16.472584 Foo
2022-01-09 03:38:16.487105 Bar
2022-01-09 03:38:17.492550 Bar
2022-01-09 03:38:18.467403 Foo
2022-01-09 03:38:18.493330 Bar
2022-01-09 03:38:19.496680 Bar
2022-01-09 03:38:20.471553 Foo
2022-01-09 03:38:20.499928 Bar
2022-01-09 03:38:21.503572 Bar
2022-01-09 03:38:22.470329 Foo
2022-01-09 03:38:22.505597 Bar
2022-01-09 03:38:23.510854 Bar
2022-01-09 03:38:24.471455 Foo
...

此外,您可以 运行 周期函数而不使用 AsyncIOScheduler,例如