apsheduler 关注点分离打破了它
apsheduler separation of concerns breaks it
关于此应用程序应如何工作,我的心智模型是能够将调用 add_job 的进程与管理该进程的进程分开。
当我在开始计划之前添加作业时,这工作正常,但当我尝试将它分成单独的功能时,它却没有。为什么? add_job后是否有commit函数需要调用
我使用的是 sqlite 和 BlockingScheduler,尽管为了调试而移至 postgresql,但它们对我的目的更有意义。
from datetime import datetime, timedelta
from time import sleep
import logging
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
import click
from crontab import CronTab
import pytz
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
jobstores = {
'default': SQLAlchemyJobStore(url='postgresql+psycopg2://myusername:mypassword@localhost/mydb')
}
executors = {
'default': ThreadPoolExecutor(5),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
sched = BackgroundScheduler(jobstores=jobstores, timezone=pytz.timezone('Australia/Sydney'))
def my_job(text):
now = datetime.now()
print(f'{now} text: {text}')
@click.group()
def cli():
pass
@click.command()
@click.option('--message', default='<BLANK>', help='to display when ')
@click.option('--crontab', default='*/1 * * * *', help='Timestamp of ')
def add_job(message, crontab):
# entry = CronTab('0 0 ? * TUE,THU')
entry = CronTab(crontab)
number_of_seconds = entry.next()
timestamp = datetime.now(pytz.timezone('Australia/Sydney')) + timedelta(seconds=number_of_seconds)
move_service_message = f'Service {message} will be moved @ {timestamp}'
sched.add_job(
my_job,
'date',
run_date=timestamp,
args=[move_service_message]
)
print('added job:' + move_service_message)
@click.command()
def start():
# this will wait forever
sched.start()
try:
# This is here to simulate application activity (which keeps the main thread alive).
while True:
sleep(10)
except (KeyboardInterrupt, SystemExit):
# Not strictly necessary if daemonic mode is enabled but should be done if possible
sched.shutdown()
cli.add_command(start)
cli.add_command(add_job)
if __name__ == "__main__":
exit_code = 0 # assume it will be okay
time_started = datetime.now()
try:
cli()
except Exception as e:
print('Exception:', e)
exit_code = 1
finally:
exit(exit_code)
我的包是最新的,包括
APScheduler 3.6.0
SQLAlchemy 1.3.1
FAQ 中似乎没有我认为可能的内容。虽然只是修改了我的期望,但我仍然会使用它。
关于此应用程序应如何工作,我的心智模型是能够将调用 add_job 的进程与管理该进程的进程分开。
当我在开始计划之前添加作业时,这工作正常,但当我尝试将它分成单独的功能时,它却没有。为什么? add_job后是否有commit函数需要调用
我使用的是 sqlite 和 BlockingScheduler,尽管为了调试而移至 postgresql,但它们对我的目的更有意义。
from datetime import datetime, timedelta
from time import sleep
import logging
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
import click
from crontab import CronTab
import pytz
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
jobstores = {
'default': SQLAlchemyJobStore(url='postgresql+psycopg2://myusername:mypassword@localhost/mydb')
}
executors = {
'default': ThreadPoolExecutor(5),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
sched = BackgroundScheduler(jobstores=jobstores, timezone=pytz.timezone('Australia/Sydney'))
def my_job(text):
now = datetime.now()
print(f'{now} text: {text}')
@click.group()
def cli():
pass
@click.command()
@click.option('--message', default='<BLANK>', help='to display when ')
@click.option('--crontab', default='*/1 * * * *', help='Timestamp of ')
def add_job(message, crontab):
# entry = CronTab('0 0 ? * TUE,THU')
entry = CronTab(crontab)
number_of_seconds = entry.next()
timestamp = datetime.now(pytz.timezone('Australia/Sydney')) + timedelta(seconds=number_of_seconds)
move_service_message = f'Service {message} will be moved @ {timestamp}'
sched.add_job(
my_job,
'date',
run_date=timestamp,
args=[move_service_message]
)
print('added job:' + move_service_message)
@click.command()
def start():
# this will wait forever
sched.start()
try:
# This is here to simulate application activity (which keeps the main thread alive).
while True:
sleep(10)
except (KeyboardInterrupt, SystemExit):
# Not strictly necessary if daemonic mode is enabled but should be done if possible
sched.shutdown()
cli.add_command(start)
cli.add_command(add_job)
if __name__ == "__main__":
exit_code = 0 # assume it will be okay
time_started = datetime.now()
try:
cli()
except Exception as e:
print('Exception:', e)
exit_code = 1
finally:
exit(exit_code)
我的包是最新的,包括
APScheduler 3.6.0
SQLAlchemy 1.3.1
FAQ 中似乎没有我认为可能的内容。虽然只是修改了我的期望,但我仍然会使用它。