Celery 4.1 周期性任务错误

Celery 4.1 periodic tasks error

我正在尝试设置一个任务 运行 每十 seconds.Using Celery Beat。

我正在使用:

 Django==1.11.3
 celery==4.1.0
 django-celery-beat==1.1.1
 django-celery-results==1.0.1

它给我以下错误:

收到类型为'operations.tasks.message'

的未注册任务

我是 Celery 的新手,我尝试了很多解决方案,但似乎找不到解决方案,希望得到帮助

settings.py

CELERY_BROKER_URL = 'pyamqp://guest@localhost//'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Africa/Johannesburg'
CELERY_BEAT_SCHEDULE = {
        'message': {
        'task': 'operations.tasks.message',
        'schedule': 10.0
    }
    }

celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'nodiso.settings')

app = Celery('nodiso')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

__init__.py

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

task.py

from __future__ import absolute_import, unicode_literals
from celery import shared_task
from operations import models
from .celery import periodic_task



@task
def message():
    t = models.Celerytest.objects.create(Message='Hello World')
    t.save()

文件结构

proj-
     proj-
         __init__.py
         settings.py-
         celery.py-
     app-
         tasks.py-

在我的 celery.py 文件中,我这样定义 app

app = Celery(
    'your_celery_app_name',
    include=[
        'your_celery_app_name.module.task1',
        'your_celery_app_name.module.task2',
    ]
)
app.config_from_object('your_celery_app_name.celeryconfig')

我的 celeryconfig.py 是我定义节拍和其他设置的地方(我认为这与您的 settings.py 相同)。

下面可能不相关 - 我不是 Python 的专家,也不是应该如何将包放在一起的专家 - 但根据我有限的理解,你的任务应该是你的 celery 应用程序模块的子模块。不过,请用少许盐服用。

我的项目结构看起来更像这样:

your_celery_app_name (dir)
    setup.py (file)
    your_celery_app_name (dir)
        __init__.py (file)
        celery.py (file)
        celeryconfig.py (file)
        module (dir)
            __init__.py (importing task1 and task2 from tasks)
            tasks.py (implementing task1 and task2)