通过视图(Django)的芹菜任务返回为待处理,但可以通过终端
Celery tasks through view (Django) returned as pending, but okay via Terminal
这个问题之前已经讨论过,看了很多帖子,我至今无法找到解决这个问题的方法。我是芹菜的新手,所以我的学习曲线仍然相当陡峭。在我当前的脚本下方:
myapp.__init__.py
from __future__ import absolute_import, unicode_literals
from .celery_main import app as celery_app # Ensures app is always imported when Django starts so that shared_task will use this app.
__all__ = ['celery_app']
myapp.celery_main.py
from __future__ import absolute_import
from celery import Celery
from django.apps import apps
# Initialise the app
app = Celery()
app.config_from_object('myapp.celeryconfig') # WORKS WHEN CALLED THROUGH VIEW/DJANGO: Tell Celery instance to use celeryconfig module
#app.config_from_object('celeryconfig') # WORKS WHEN CALLED THROUGH TERMINAL
# Load task modules from all registered Django app configs.
app.autodiscover_tasks(lambda: [n.name for n in apps.get_app_configs()])
myapp.celeryconfig.py
from __future__ import absolute_import, unicode_literals
from datetime import timedelta
## List of modules to import when celery starts.
CELERY_IMPORTS = ('celery_tasks',)
## Message Broker (RabbitMQ) settings.
BROKER_URL = 'amqp://'
BROKER_PORT = 5672
## Result store settings.
CELERY_RESULT_BACKEND = 'rpc://'
## Misc
#CELERY_IGNORE_RESULT = False
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TIMEZONE = 'Europe/Berlin'
CELERY_ENABLE_UTC = True
CELERYBEAT_SCHEDULE = {
'doctor-every-10-seconds': {
'task': 'celery_tasks.fav_doctor',
'schedule': timedelta(seconds=3),
},
}
myapp.celery_tasks.py
from __future__ import absolute_import
from celery.task import task
suf = lambda n: "%d%s" % (n, {1: "st", 2: "nd", 3: "rd"}.get(n if n < 20 else n % 10, "th"))
@task
def fav_doctor():
# Stuff happend here
@task
def reverse(string):
return string[::-1]
@task
def send_email(user_id):
# Stuff happend here
@task
def add(x, y):
return x+y
anotherapp.settings.py
INSTALLED_APPS = [
...
'kombu.transport.django',
]
myapp.views.admin_scripts.py
from celery.result import AsyncResult
from myapp.celery_tasks import fav_doctor, reverse, send_email, add
from myapp.celery_main import app
@login_required
def admin_script_dashboard(request):
if request.method == 'POST':
form = Admin_Script(request.POST)
if form.is_valid():
# Results
async_result = add.delay(2, 5)
task_id = async_result.task_id
res = AsyncResult(async_result)
res_1 = add.AsyncResult(async_result)
res_2 = add.AsyncResult(async_result.id)
print ("async_result: {0}\ntask_id: {1}\nres: {2}\nres_1: {3}\nres_2: {4}".format(async_result, task_id, res, res_1, res_2))
# Backend: Make sure the client is configured with the right backend
print("Backend check: {0}".format(async_result.backend))
# States/statuses
task_state = res.state
A = async_result.status
B = res.status
print ("task_state: {0}\nA: {1}\nB: {2}".format(task_state, A, B))
通过我的 django 应用程序触发 celery worker 的结果(与 app.views.admin_scripts.py
中的打印语句相关):
async_result: 00d7ec84-ebdb-4968-9ea6-f20ca2a793b7
task_id: 00d7ec84-ebdb-4968-9ea6-f20ca2a793b7
res: 00d7ec84-ebdb-4968-9ea6-f20ca2a793b7
res_1: 00d7ec84-ebdb-4968-9ea6-f20ca2a793b7
res_2: 00d7ec84-ebdb-4968-9ea6-f20ca2a793b7
Backend check: <celery.backends.rpc.RPCBackend object at 0x106e308d0>
task_state: PENDING
A: PENDING
B: PENDING
触发终端输出:
[2018-07-15 21:41:47,015: ERROR/MainProcess] Received unregistered task of type 'MyApp.celery_tasks.add'.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you are using relative imports?
Please see <link> for more information.
The full contents of the message body was:
{'task': 'MyApp.celery_tasks.add', 'id': 'b21ffa43-d1f1-4767-9ab8-e58afec3ea0f', 'args': [2, 5], 'kwargs': {}, 'retries': 0, 'eta': None, 'expires': None, 'utc': True, 'callbacks': None, 'errbacks': None, 'timelimit': [None, None], 'taskset': None, 'chord': None} (266b)
Traceback (most recent call last):
File "/Users/My_MBP/anaconda3/lib/python3.6/site-packages/celery/worker/consumer.py", line 465, in on_task_received
strategies[type_](message, body,
KeyError: 'MyApp.celery_tasks.add'
我有几个问题:
1. 我可以在终端中使用命令触发预期结果:
celery -A celery_tasks worker -l info
然后在 Python shell:
from celery_tasks import *
add.delay(2,3)
哪个成功:
[2018-07-13 10:12:14,943: INFO/MainProcess] Received task: celery_tasks.add[c100ad91-2f94-40b1-bb0e-9bc2990ff3bc]
[2018-07-13 10:12:14,961: INFO/MainProcess] Task celery_tasks.add[c100ad91-2f94-40b1-bb0e-9bc2990ff3bc] succeeded in 0.017578680999577045s: 54
所以在终端中执行任务是可行的,但在 Django 中的 view.py
中却不行,为什么不呢?
2. 可能与 1. 有关:我必须,烦人地,在 app.celery_main.py
中配置 app.config_from_object
取决于我是想通过 Django 还是通过终端进行测试。您可以看到我将 celeryconfig.py
设置为带有 myapp 名称前缀或不带前缀。否则,将抛出一条错误消息。我怀疑某种导入循环导致了这里的问题(尽管我可能是错的)但我不知道 why/where。我该如何克服这个问题?
3. 在我的 settings.py
文件中(不是 celeryconfig.py
)我已经在 INSTALLED_APPS: 'kombu.transport.django'
中进行了配置。这是必要的吗?我正在使用 celery 3.1.26.post2 (Cipater)
4. 在我的所有文件中,我都在顶部:
from __future__ import absolute_import, unicode_literals
这到底是为了什么目的,对于 3.1.26 来说是必需的吗?
5. 我读到 here,您需要确保客户端配置了正确的后端。但我不确定这到底是什么意思。我的打印输出是(根据 app.views.admin_scripts.py
):
Backend check: <celery.backends.rpc.RPCBackend object at 0x106e308d0>
如果您发现我的代码有任何异常,请随时告诉我。
Celery 任务应该 have proper names。当从 django 运行ning 时,任务名称是 MyApp.celery_tasks.add
这就是 celery worker 不能 运行 的原因。但是当您使用 from celery_tasks import *
导入时从终端,任务名称是 celery_tasks.add
这就是它正常工作的原因。
您可以根据环境变量更改配置。
kombu.transport.django
无需添加。
这与Python 2/3有关。有关详细信息,请参阅 this docs。
如果你想要任务完成后的任务结果,它应该存储在某个地方。所以这个需要后端。如果你不想检索结果,你不需要这个。
我仍在尝试找出问题 2 的答案,同时我已经想出了如何检索所需结果的方法:我有 async_result = add.delay(2, 5)
但在此之后 我需要 async_result.get()
后跟 task_output = async_result.result
。结果status/state(async_result.state
或async_result.status
)然后设置成功。
这个问题之前已经讨论过,看了很多帖子,我至今无法找到解决这个问题的方法。我是芹菜的新手,所以我的学习曲线仍然相当陡峭。在我当前的脚本下方:
myapp.__init__.py
from __future__ import absolute_import, unicode_literals
from .celery_main import app as celery_app # Ensures app is always imported when Django starts so that shared_task will use this app.
__all__ = ['celery_app']
myapp.celery_main.py
from __future__ import absolute_import
from celery import Celery
from django.apps import apps
# Initialise the app
app = Celery()
app.config_from_object('myapp.celeryconfig') # WORKS WHEN CALLED THROUGH VIEW/DJANGO: Tell Celery instance to use celeryconfig module
#app.config_from_object('celeryconfig') # WORKS WHEN CALLED THROUGH TERMINAL
# Load task modules from all registered Django app configs.
app.autodiscover_tasks(lambda: [n.name for n in apps.get_app_configs()])
myapp.celeryconfig.py
from __future__ import absolute_import, unicode_literals
from datetime import timedelta
## List of modules to import when celery starts.
CELERY_IMPORTS = ('celery_tasks',)
## Message Broker (RabbitMQ) settings.
BROKER_URL = 'amqp://'
BROKER_PORT = 5672
## Result store settings.
CELERY_RESULT_BACKEND = 'rpc://'
## Misc
#CELERY_IGNORE_RESULT = False
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TIMEZONE = 'Europe/Berlin'
CELERY_ENABLE_UTC = True
CELERYBEAT_SCHEDULE = {
'doctor-every-10-seconds': {
'task': 'celery_tasks.fav_doctor',
'schedule': timedelta(seconds=3),
},
}
myapp.celery_tasks.py
from __future__ import absolute_import
from celery.task import task
suf = lambda n: "%d%s" % (n, {1: "st", 2: "nd", 3: "rd"}.get(n if n < 20 else n % 10, "th"))
@task
def fav_doctor():
# Stuff happend here
@task
def reverse(string):
return string[::-1]
@task
def send_email(user_id):
# Stuff happend here
@task
def add(x, y):
return x+y
anotherapp.settings.py
INSTALLED_APPS = [
...
'kombu.transport.django',
]
myapp.views.admin_scripts.py
from celery.result import AsyncResult
from myapp.celery_tasks import fav_doctor, reverse, send_email, add
from myapp.celery_main import app
@login_required
def admin_script_dashboard(request):
if request.method == 'POST':
form = Admin_Script(request.POST)
if form.is_valid():
# Results
async_result = add.delay(2, 5)
task_id = async_result.task_id
res = AsyncResult(async_result)
res_1 = add.AsyncResult(async_result)
res_2 = add.AsyncResult(async_result.id)
print ("async_result: {0}\ntask_id: {1}\nres: {2}\nres_1: {3}\nres_2: {4}".format(async_result, task_id, res, res_1, res_2))
# Backend: Make sure the client is configured with the right backend
print("Backend check: {0}".format(async_result.backend))
# States/statuses
task_state = res.state
A = async_result.status
B = res.status
print ("task_state: {0}\nA: {1}\nB: {2}".format(task_state, A, B))
通过我的 django 应用程序触发 celery worker 的结果(与 app.views.admin_scripts.py
中的打印语句相关):
async_result: 00d7ec84-ebdb-4968-9ea6-f20ca2a793b7
task_id: 00d7ec84-ebdb-4968-9ea6-f20ca2a793b7
res: 00d7ec84-ebdb-4968-9ea6-f20ca2a793b7
res_1: 00d7ec84-ebdb-4968-9ea6-f20ca2a793b7
res_2: 00d7ec84-ebdb-4968-9ea6-f20ca2a793b7
Backend check: <celery.backends.rpc.RPCBackend object at 0x106e308d0>
task_state: PENDING
A: PENDING
B: PENDING
触发终端输出:
[2018-07-15 21:41:47,015: ERROR/MainProcess] Received unregistered task of type 'MyApp.celery_tasks.add'.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you are using relative imports?
Please see <link> for more information.
The full contents of the message body was:
{'task': 'MyApp.celery_tasks.add', 'id': 'b21ffa43-d1f1-4767-9ab8-e58afec3ea0f', 'args': [2, 5], 'kwargs': {}, 'retries': 0, 'eta': None, 'expires': None, 'utc': True, 'callbacks': None, 'errbacks': None, 'timelimit': [None, None], 'taskset': None, 'chord': None} (266b)
Traceback (most recent call last):
File "/Users/My_MBP/anaconda3/lib/python3.6/site-packages/celery/worker/consumer.py", line 465, in on_task_received
strategies[type_](message, body,
KeyError: 'MyApp.celery_tasks.add'
我有几个问题:
1. 我可以在终端中使用命令触发预期结果:
celery -A celery_tasks worker -l info
然后在 Python shell:
from celery_tasks import *
add.delay(2,3)
哪个成功:
[2018-07-13 10:12:14,943: INFO/MainProcess] Received task: celery_tasks.add[c100ad91-2f94-40b1-bb0e-9bc2990ff3bc]
[2018-07-13 10:12:14,961: INFO/MainProcess] Task celery_tasks.add[c100ad91-2f94-40b1-bb0e-9bc2990ff3bc] succeeded in 0.017578680999577045s: 54
所以在终端中执行任务是可行的,但在 Django 中的 view.py
中却不行,为什么不呢?
2. 可能与 1. 有关:我必须,烦人地,在 app.celery_main.py
中配置 app.config_from_object
取决于我是想通过 Django 还是通过终端进行测试。您可以看到我将 celeryconfig.py
设置为带有 myapp 名称前缀或不带前缀。否则,将抛出一条错误消息。我怀疑某种导入循环导致了这里的问题(尽管我可能是错的)但我不知道 why/where。我该如何克服这个问题?
3. 在我的 settings.py
文件中(不是 celeryconfig.py
)我已经在 INSTALLED_APPS: 'kombu.transport.django'
中进行了配置。这是必要的吗?我正在使用 celery 3.1.26.post2 (Cipater)
4. 在我的所有文件中,我都在顶部:
from __future__ import absolute_import, unicode_literals
这到底是为了什么目的,对于 3.1.26 来说是必需的吗?
5. 我读到 here,您需要确保客户端配置了正确的后端。但我不确定这到底是什么意思。我的打印输出是(根据 app.views.admin_scripts.py
):
Backend check: <celery.backends.rpc.RPCBackend object at 0x106e308d0>
如果您发现我的代码有任何异常,请随时告诉我。
Celery 任务应该 have proper names。当从 django 运行ning 时,任务名称是
MyApp.celery_tasks.add
这就是 celery worker 不能 运行 的原因。但是当您使用from celery_tasks import *
导入时从终端,任务名称是celery_tasks.add
这就是它正常工作的原因。您可以根据环境变量更改配置。
kombu.transport.django
无需添加。这与Python 2/3有关。有关详细信息,请参阅 this docs。
如果你想要任务完成后的任务结果,它应该存储在某个地方。所以这个需要后端。如果你不想检索结果,你不需要这个。
我仍在尝试找出问题 2 的答案,同时我已经想出了如何检索所需结果的方法:我有 async_result = add.delay(2, 5)
但在此之后 我需要 async_result.get()
后跟 task_output = async_result.result
。结果status/state(async_result.state
或async_result.status
)然后设置成功。