如何在 Flask 上启动 Celery Beat
How to start Celery Beat on Flask
我目前有一个如下所示的烧瓶代码
app.py
from services.celery_maker import make_celery
from flask import Flask
from datetime import timedelta
template_dir = os.path.abspath('./build/')
app = Flask(__name__, template_folder=template_dir, static_folder=os.path.abspath("./build/static"))
app.config['ERROR_404_HELP'] = False
app.config['SECRET_KEY'] = config.get("DEFAULT", "SECRET_KEY")
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(minutes=5)
app.config.update(
CELERY_BROKER_URL='redis://127.0.0.1:6379/0',
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/0',
)
app.config['CELERYBEAT_SCHEDULE'] = {
# Executes every minute
'periodic_task-every-minute': {
'task': 'periodic_task',
'schedule': timedelta(seconds=30)
}
}
@app.route('/')
def view():
return "Hello, Flask is up and running!"
@celery.task(name ="periodic_task")
def periodic_task():
print('Hi! from periodic_task')
logger.info("Hello! from periodic task")
if __name__ == "__main__":
app.run(debug = True)
我将我的 celery maker 放在不同的文件中以阻止相关导入错误
services.celery_maker.py
from celery import Celery
def make_celery(app_name=__name__):
backend = "redis://localhost:6379/0"
broker = backend.replace("0", "1")
return Celery(app_name, backend=backend, broker=broker)
celery = make_celery()
celery worker 看到了我的任务,但它根本没有 运行 我不知道发生了什么
对于celery中的周期性任务,你还需要使用celery beat,beats会调度任务,workers会执行任务,总之你需要和worker一起启动celery-beat
celery beat -A <path_to_worker_created_under_celery_app> -l info
例如。在你的情况下celery beat -A services.celery_maker.celery -l info
我目前有一个如下所示的烧瓶代码 app.py
from services.celery_maker import make_celery
from flask import Flask
from datetime import timedelta
template_dir = os.path.abspath('./build/')
app = Flask(__name__, template_folder=template_dir, static_folder=os.path.abspath("./build/static"))
app.config['ERROR_404_HELP'] = False
app.config['SECRET_KEY'] = config.get("DEFAULT", "SECRET_KEY")
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(minutes=5)
app.config.update(
CELERY_BROKER_URL='redis://127.0.0.1:6379/0',
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/0',
)
app.config['CELERYBEAT_SCHEDULE'] = {
# Executes every minute
'periodic_task-every-minute': {
'task': 'periodic_task',
'schedule': timedelta(seconds=30)
}
}
@app.route('/')
def view():
return "Hello, Flask is up and running!"
@celery.task(name ="periodic_task")
def periodic_task():
print('Hi! from periodic_task')
logger.info("Hello! from periodic task")
if __name__ == "__main__":
app.run(debug = True)
我将我的 celery maker 放在不同的文件中以阻止相关导入错误
services.celery_maker.py
from celery import Celery
def make_celery(app_name=__name__):
backend = "redis://localhost:6379/0"
broker = backend.replace("0", "1")
return Celery(app_name, backend=backend, broker=broker)
celery = make_celery()
celery worker 看到了我的任务,但它根本没有 运行 我不知道发生了什么
对于celery中的周期性任务,你还需要使用celery beat,beats会调度任务,workers会执行任务,总之你需要和worker一起启动celery-beat
celery beat -A <path_to_worker_created_under_celery_app> -l info
例如。在你的情况下celery beat -A services.celery_maker.celery -l info