通过视图(Django)的芹菜任务返回为待处理,但可以通过终端

Celery tasks through view (Django) returned as pending, but okay via Terminal

这个问题之前已经讨论过,看了很多帖子,我至今无法找到解决这个问题的方法。我是芹菜的新手,所以我的学习曲线仍然相当陡峭。在我当前的脚本下方:

myapp.__init__.py

from __future__ import absolute_import, unicode_literals
from .celery_main import app as celery_app # Ensures app is always imported when Django starts so that shared_task will use this app.

__all__ = ['celery_app']

myapp.celery_main.py

from __future__ import absolute_import
from celery import Celery
from django.apps import apps

# Initialise the app
app = Celery()                          
app.config_from_object('myapp.celeryconfig')  # WORKS WHEN CALLED THROUGH VIEW/DJANGO: Tell Celery instance to use celeryconfig module
#app.config_from_object('celeryconfig')  # WORKS WHEN CALLED THROUGH TERMINAL

# Load task modules from all registered Django app configs.
app.autodiscover_tasks(lambda: [n.name for n in apps.get_app_configs()])

myapp.celeryconfig.py

from __future__ import absolute_import, unicode_literals
from datetime import timedelta

## List of modules to import when celery starts.
CELERY_IMPORTS = ('celery_tasks',)

## Message Broker (RabbitMQ) settings.
BROKER_URL = 'amqp://'
BROKER_PORT = 5672

## Result store settings.
CELERY_RESULT_BACKEND = 'rpc://'

## Misc
#CELERY_IGNORE_RESULT = False
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TIMEZONE = 'Europe/Berlin'
CELERY_ENABLE_UTC = True

CELERYBEAT_SCHEDULE = {
    'doctor-every-10-seconds': {
        'task': 'celery_tasks.fav_doctor',
        'schedule': timedelta(seconds=3),
    },
}

myapp.celery_tasks.py

from __future__ import absolute_import
from celery.task import task

suf = lambda n: "%d%s" % (n, {1: "st", 2: "nd", 3: "rd"}.get(n if n < 20 else n % 10, "th"))
@task
def fav_doctor():
    # Stuff happend here

@task
def reverse(string):
    return string[::-1]

@task
def send_email(user_id):
    # Stuff happend here

@task
def add(x, y):
    return x+y

anotherapp.settings.py

INSTALLED_APPS = [
    ...
    'kombu.transport.django',
]

myapp.views.admin_scripts.py

from celery.result import AsyncResult
from myapp.celery_tasks import fav_doctor, reverse, send_email, add
from myapp.celery_main import app

@login_required
def admin_script_dashboard(request):
    if request.method == 'POST':
        form = Admin_Script(request.POST)
        if form.is_valid():
                # Results
                async_result = add.delay(2, 5)
                task_id = async_result.task_id
                res = AsyncResult(async_result)
                res_1 = add.AsyncResult(async_result)
                res_2 = add.AsyncResult(async_result.id)
                print ("async_result: {0}\ntask_id: {1}\nres: {2}\nres_1: {3}\nres_2: {4}".format(async_result, task_id, res, res_1, res_2))

                # Backend: Make sure the client is configured with the right backend
                print("Backend check: {0}".format(async_result.backend))

                # States/statuses
                task_state = res.state
                A = async_result.status
                B = res.status
                print ("task_state: {0}\nA: {1}\nB: {2}".format(task_state, A, B))

通过我的 django 应用程序触发 celery worker 的结果(与 app.views.admin_scripts.py 中的打印语句相关):

async_result: 00d7ec84-ebdb-4968-9ea6-f20ca2a793b7
task_id: 00d7ec84-ebdb-4968-9ea6-f20ca2a793b7
res: 00d7ec84-ebdb-4968-9ea6-f20ca2a793b7
res_1: 00d7ec84-ebdb-4968-9ea6-f20ca2a793b7
res_2: 00d7ec84-ebdb-4968-9ea6-f20ca2a793b7
Backend check: <celery.backends.rpc.RPCBackend object at 0x106e308d0>
task_state: PENDING
A: PENDING
B: PENDING

触发终端输出:

[2018-07-15 21:41:47,015: ERROR/MainProcess] Received unregistered task of type 'MyApp.celery_tasks.add'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you are using relative imports?
Please see <link> for more information.

The full contents of the message body was:
{'task': 'MyApp.celery_tasks.add', 'id': 'b21ffa43-d1f1-4767-9ab8-e58afec3ea0f', 'args': [2, 5], 'kwargs': {}, 'retries': 0, 'eta': None, 'expires': None, 'utc': True, 'callbacks': None, 'errbacks': None, 'timelimit': [None, None], 'taskset': None, 'chord': None} (266b)
Traceback (most recent call last):
  File "/Users/My_MBP/anaconda3/lib/python3.6/site-packages/celery/worker/consumer.py", line 465, in on_task_received
    strategies[type_](message, body,
KeyError: 'MyApp.celery_tasks.add'

我有几个问题:

1. 我可以在终端中使用命令触发预期结果:

celery -A celery_tasks worker -l info

然后在 Python shell:

from celery_tasks import *
add.delay(2,3) 

哪个成功:

[2018-07-13 10:12:14,943: INFO/MainProcess] Received task: celery_tasks.add[c100ad91-2f94-40b1-bb0e-9bc2990ff3bc]
[2018-07-13 10:12:14,961: INFO/MainProcess] Task celery_tasks.add[c100ad91-2f94-40b1-bb0e-9bc2990ff3bc] succeeded in 0.017578680999577045s: 54

所以在终端中执行任务是可行的,但在 Django 中的 view.py 中却不行,为什么不呢?

2. 可能与 1. 有关:我必须,烦人地,在 app.celery_main.py 中配置 app.config_from_object 取决于我是想通过 Django 还是通过终端进行测试。您可以看到我将 celeryconfig.py 设置为带有 myapp 名称前缀或不带前缀。否则,将抛出一条错误消息。我怀疑某种导入循环导致了这里的问题(尽管我可能是错的)但我不知道 why/where。我该如何克服这个问题?

3. 在我的 settings.py 文件中(不是 celeryconfig.py)我已经在 INSTALLED_APPS: 'kombu.transport.django' 中进行了配置。这是必要的吗?我正在使用 celery 3.1.26.post2 (Cipater)

4. 在我的所有文件中,我都在顶部:

from __future__ import absolute_import, unicode_literals

这到底是为了什么目的,对于 3.1.26 来说是必需的吗?

5. 我读到 here,您需要确保客户端配置了正确的后端。但我不确定这到底是什么意思。我的打印输出是(根据 app.views.admin_scripts.py):

Backend check: <celery.backends.rpc.RPCBackend object at 0x106e308d0>

如果您发现我的代码有任何异常,请随时告诉我。

  1. Celery 任务应该 have proper names。当从 django 运行ning 时,任务名称是 MyApp.celery_tasks.add 这就是 celery worker 不能 运行 的原因。但是当您使用 from celery_tasks import * 导入时从终端,任务名称是 celery_tasks.add 这就是它正常工作的原因。

  2. 您可以根据环境变量更改配置。

  3. kombu.transport.django 无需添加。

  4. 这与Python 2/3有关。有关详细信息,请参阅 this docs

  5. 如果你想要任务完成后的任务结果,它应该存储在某个地方。所以这个需要后端。如果你不想检索结果,你不需要这个。

我仍在尝试找出问题 2 的答案,同时我已经想出了如何检索所需结果的方法:我有 async_result = add.delay(2, 5) 但在此之后 我需要 async_result.get() 后跟 task_output = async_result.result。结果status/state(async_result.stateasync_result.status)然后设置成功。