如何在 apscheduler 作业中从 class 调用函数?

How to call function from class in apscheduler job?

我有一个 class 有触发器,我想在作业命中适当的时候更新这些触发器。

我的class:

class SetTrigger(object):
    def __init__(self):
        self.set_work = False
        self.set_reload = False
        self.set_refresh = False
    
    def UpdateSetRefresh(self):
        self.set_refresh = True
        return self.set_refresh

    def UpdateSetWork(self):
        self.set_work = True
        return self.set_work

    def UpdateSetReload(self):
        self.set_reload = True
        return self.set_reload

如果我将变量设置为从函数中获取值,如下所示,我会收到一个错误。

trigger = SetTrigger()
set_refresh = trigger.UpdateSetRefresh()

错误:

func must be a callable or a textual reference to one

我已经尝试调用 trigger.UpdateSetRefresh,但变量 set_refresh 没有更改为 True。如果我调用 trigger.UpdateSetRefresh() 它会给我上面同样的错误。

如何在 APScheduler 作业中从 class SetTrigger 调用函数 UpdateSetRefresh,如下所示?

我试过了:

scheduler.add_job(trigger.UpdateSetRefresh, 'interval', seconds=20, id='1', name='refresh_hereoes_positions', misfire_grace_time=180)

[编辑]

trigger.UpdateSetRefresh 未按我的需要返回 True 的示例:

import tzlocal
import time
import os
from bot import SetTrigger
from apscheduler.schedulers.asyncio import AsyncIOScheduler

trigger = SetTrigger()

scheduler = AsyncIOScheduler(timezone=str(tzlocal.get_localzone()))
scheduler.add_job(trigger.UpdateSetRefresh, 'interval', seconds=3)
scheduler.start()
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

try:
    # This is here to simulate application activity (which keeps the main thread alive).
    while True:
        time.sleep(2)
        print(trigger.set_refresh)
except (KeyboardInterrupt, SystemExit):
    # Not strictly necessary if daemonic mode is enabled but should be done if possible
    scheduler.shutdown()

输出:

Press Ctrl+Break to exit
False
False
False

您必须将函数传递给 add_job 但您传递的不是它而是 trigger.UpdateSetRefresh.
的返回值 所以正确的说法应该是:

scheduler.add_job(trigger.UpdateSetRefresh, 'interval', seconds=2, id='1', name='refresh_hereoes_positions')

而不是

scheduler.add_job(trigger.UpdateSetRefresh(), 'interval', seconds=2, id='1', name='refresh_hereoes_positions')

当您在函数名称末尾添加括号时,python 将执行它并将返回值传递给 add_job 函数。
有关将函数作为参数传递的更多信息,请查看此 link.

[编辑]
如果您使用的是 AsyncIOScheduler 你应该这样做

import tzlocal
import os
from bot import SetTrigger
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import asyncio


def DisplayValue(trigger):
    print(trigger.UpdateSetWork())


trigger = SetTrigger()
scheduler = AsyncIOScheduler(timezone=str(tzlocal.get_localzone()))
scheduler.add_job(trigger.UpdateSetRefresh, 'interval', seconds=3)
scheduler.start()
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))


try:
    loop = asyncio.get_event_loop()
    loop.call_at(10, displayValue, trigger)
    loop.run_forever()
except (KeyboardInterrupt, SystemExit):
    scheduler.shutdown()

或者你可以像那样使用后台调度程序

from apscheduler.schedulers.background import BackgroundScheduler
from bot import SetTrigger
import tzlocal
import time

trigger = SetTrigger()
scheduler = BackgroundScheduler()
scheduler.configure(timezone=str(tzlocal.get_localzone()))
scheduler.add_job(trigger.UpdateSetRefresh, 'interval', seconds=2, id='1', name='refresh_hereoes_positions')
scheduler.start()

while True:
    print(trigger.set_refresh)
    time.sleep(2)