Python Celery根本不执行周期性任务

Python Celery does not execute periodic tasks at all

我从 2 天开始就尝试让 celery 执行周期性任务,但我一点运气都没有。可以请有人帮助我。手动触发任务按预期工作。但是,如果我像配置的那样等待 60 秒,那么控制台上什么也不会发生。我已经阅读了整个文档,也完全从头开始编写了我的配置,运气不好,任务不会自己执行:(

我是否必须明确定义

我正在与:

初始化.py:

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)
default_app_config = 'MyApp.apps.AccountsConfig'

celery.py

from celery import Celery
import environ
from django.conf import settings
import celery

env = environ.Env()

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "MyApp.settings")

app = Celery(settings.SITE_NAME)
app.config_from_object('django.conf:settings')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))

settings.py

BROKER_URL = 'redis://' + ':' + env.str('REDIS_PWD') + '@' + env.str('REDIS_HOST') + ":" + env.str('REDIS_PORT')
BROKER_TRANSPORT = 'redis'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_TASK_RESULT_EXPIRES = 7200
CELERY_REDIS_HOST = env.str('REDIS_HOST')
REDIS_PWD = env.str('REDIS_PWD')
CELERY_REDIS_PORT = env.str('REDIS_PORT')
CELERY_REDIS_DB = env.str('REDIS_CELERY_DB')
CELERY_CACHE_BACKEND = 'default'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'
CELERYD_TASK_SOFT_TIME_LIMIT = 3600
DJANGO_REDIS_LOG_IGNORED_EXCEPTIONS = True

tasks.py:

@app.on_after_finalize.connect
def app_ready(**kwargs):
    """
    Called once after app has been finalized.
    """
    sender = kwargs.get('sender')

    sender.add_periodic_task(60, self_check_beat.s('Celery Beat self check'))# 1 min.

@app.task(name="Celery Beat self check", ignore_result=False)
def self_check_beat(arg):
return "Celery Self-Check Task says <<HELLO>>"

知道为什么 celery 不会每 5 分钟触发一次任务吗?

app.conf.beat_schedule={
"task":"your task",
"schedule":"your schedule"
}

您是否在 celery.py 文件中添加了上述代码段?

https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html