芹菜周期性任务不起作用

Celery periodic task not working

我试着关注 django 的芹菜文档。这是我的项目结构:

├── hiren
│   ├── celery_app.py
│   ├── __init__.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── manage.py
├── reminder
│   ├── admin.py
│   ├── __init__.py
│   ├── migrations
│   ├── models.py
│   ├── serializers.py
│   |── task.py
│   |── tests.py
│   |── views.py

这是我的 settings.py 文件:

BROKER_URL = 'redis://localhost:6379/4'
CELERYBEAT_SCHEDULE = {
    'run-every-5-seconds': {
        'task': 'reminder.task.run',
        'schedule': timedelta(seconds=5),
        'args': (16, 16)
    },
}

和reminder/task.py 文件:

def run():
    print('hello')

当我使用 运行 celery -A hiren beat -l debug 命令时,我没有在终端中看到 "hello" 文本。我缺少什么?

要从任何可调用对象创建任务,您需要使用 task() 装饰器。这将为 run().

创建一个 celery 任务

reminder/task.py:

from celery import Celery

app = Celery('tasks', broker='redis://localhost')

@app.task
def run():
    print('hello')

Celery库在使用前必须实例化,这个实例称为应用程序(简称app)。

如果您使用的是基于“旧”模块的 celery API,那么您可以像这样导入任务装饰器:

from celery import task

@task
def run():
    print('hello')

尽管这会像第一种方法一样创建一个 celery 任务,但不推荐这样做。