如何在 Celery link_error 回调中获取 "full" 异步结果
How to get the "full" async result in Celery link_error callback
我有 Celery 3.1.18 运行 Django 1.6.11 和 RabbitMQ 3.5.4,并试图在失败状态下测试我的异步任务 (CELERY_ALWAYS_EAGER=True)。但是,我无法在错误回调中获得正确的 "result" 。 Celery docs 中的示例显示:
@app.task(bind=True)
def error_handler(self, uuid):
result = self.app.AsyncResult(uuid)
print('Task {0} raised exception: {1!r}\n{2!r}'.format(
uuid, result.result, result.traceback))
当我这样做时,我的结果仍然是 "PENDING"、result.result = ''
和 result.traceback=''
。但是我的 .apply_async
调用返回的实际结果具有正确的 "FAILURE" 状态和回溯。
我的代码(基本上是一个解析 .tar.gz 文件的 Django Rest Framework RESTful 端点,然后在文件解析完成后向用户发送通知):
views.py:
from producer_main.celery import app as celery_app
@celery_app.task()
def _upload_error_simple(uuid):
print uuid
result = celery_app.AsyncResult(uuid)
print result.backend
print result.state
print result.result
print result.traceback
msg = 'Task {0} raised exception: {1!r}\n{2!r}'.format(uuid,
result.result,
result.traceback)
class UploadNewFile(APIView):
def post(self, request, repository_id, format=None):
try:
uploaded_file = self.data['files'][self.data['files'].keys()[0]]
self.path = default_storage.save('{0}/{1}'.format(settings.MEDIA_ROOT,
uploaded_file.name),
uploaded_file)
print type(import_file)
self.async_result = import_file.apply_async((self.path, request.user),
link_error=_upload_error_simple.s())
print 'results from self.async_result:'
print self.async_result.id
print self.async_result.backend
print self.async_result.state
print self.async_result.result
print self.async_result.traceback
return Response()
except (PermissionDenied, InvalidArgument, NotFound, KeyError) as ex:
gutils.handle_exceptions(ex)
tasks.py:
from producer_main.celery import app
from utilities.general import upload_class
@app.task
def import_file(path, user):
"""Asynchronously import a course."""
upload_class(path, user)
celery.py:
"""
As described in
http://celery.readthedocs.org/en/latest/django/first-steps-with-django.html
"""
from __future__ import absolute_import
import os
import logging
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'producer_main.settings')
from django.conf import settings
log = logging.getLogger(__name__)
app = Celery('producer') # pylint: disable=invalid-name
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) # pragma: no cover
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
我的后端是这样配置的:
CELERY_ALWAYS_EAGER = True
CELERY_EAGER_PROPAGATES_EXCEPTIONS = False
BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'redis://localhost'
CELERY_RESULT_PERSISTENT = True
CELERY_IGNORE_RESULT = False
当我 运行 对 link_error 状态进行单元测试时,我得到:
Creating test database for alias 'default'...
<class 'celery.local.PromiseProxy'>
130ccf13-c2a0-4bde-8d49-e17eeb1b0115
<celery.backends.redis.RedisBackend object at 0x10aa2e110>
PENDING
None
None
results from self.async_result:
130ccf13-c2a0-4bde-8d49-e17eeb1b0115
None
FAILURE
Non .zip / .tar.gz file passed in.
Traceback (most recent call last):
所以任务结果在我的 _upload_error_simple()
方法中不可用,但它们可从 self.async_result
返回的变量中获得...
您似乎将 _upload_error()
作为 class 的绑定方法 - 这可能不是您想要的。试着让它成为一个独立的任务:
@celery_app.task(bind=True)
def _upload_error(self, uuid):
result = celery_app.AsyncResult(uuid)
msg = 'Task {0} raised exception: {1!r}\n{2!r}'.format(uuid,
result.result,
result.traceback)
class Whatever(object):
....
self.async_result = import_file.apply_async((self.path, request.user),
link=self._upload_success.s(
"Upload finished."),
link_error=_upload_error.s())
实际上不需要 self
参数,因为它没有被使用,所以你可以这样做:
@celery_app.task()
def _upload_error(uuid):
result = celery_app.AsyncResult(uuid)
msg = 'Task {0} raised exception: {1!r}\n{2!r}'.format(uuid,
result.result,
result.traceback)
请注意缺少 bind=True
和 self
我无法使 link
和 link_error
回调正常工作,因此我最终不得不使用 [=16] 中描述的 on_failure
和 on_success
任务方法=].我的 tasks.py
看起来像:
class ErrorHandlingTask(Task):
abstract = True
def on_failure(self, exc, task_id, targs, tkwargs, einfo):
msg = 'Import of {0} raised exception: {1!r}'.format(targs[0].split('/')[-1],
str(exc))
def on_success(self, retval, task_id, targs, tkwargs):
msg = "Upload successful. You may now view your course."
@app.task(base=ErrorHandlingTask)
def import_file(path, user):
"""Asynchronously import a course."""
upload_class(path, user)
小心 UUID
个实例!
如果您尝试获取 ID 不是 string
类型而是 UUID
类型的任务的状态,您将只会获得 PENDING
状态。
from uuid import UUID
from celery.result import AsyncResult
task_id = UUID('d4337c01-4402-48e9-9e9c-6e9919d5e282')
print(AsyncResult(task_id).state)
# PENDING
print(AsyncResult(str(task_id)).state)
# SUCCESS
我有 Celery 3.1.18 运行 Django 1.6.11 和 RabbitMQ 3.5.4,并试图在失败状态下测试我的异步任务 (CELERY_ALWAYS_EAGER=True)。但是,我无法在错误回调中获得正确的 "result" 。 Celery docs 中的示例显示:
@app.task(bind=True)
def error_handler(self, uuid):
result = self.app.AsyncResult(uuid)
print('Task {0} raised exception: {1!r}\n{2!r}'.format(
uuid, result.result, result.traceback))
当我这样做时,我的结果仍然是 "PENDING"、result.result = ''
和 result.traceback=''
。但是我的 .apply_async
调用返回的实际结果具有正确的 "FAILURE" 状态和回溯。
我的代码(基本上是一个解析 .tar.gz 文件的 Django Rest Framework RESTful 端点,然后在文件解析完成后向用户发送通知):
views.py:
from producer_main.celery import app as celery_app
@celery_app.task()
def _upload_error_simple(uuid):
print uuid
result = celery_app.AsyncResult(uuid)
print result.backend
print result.state
print result.result
print result.traceback
msg = 'Task {0} raised exception: {1!r}\n{2!r}'.format(uuid,
result.result,
result.traceback)
class UploadNewFile(APIView):
def post(self, request, repository_id, format=None):
try:
uploaded_file = self.data['files'][self.data['files'].keys()[0]]
self.path = default_storage.save('{0}/{1}'.format(settings.MEDIA_ROOT,
uploaded_file.name),
uploaded_file)
print type(import_file)
self.async_result = import_file.apply_async((self.path, request.user),
link_error=_upload_error_simple.s())
print 'results from self.async_result:'
print self.async_result.id
print self.async_result.backend
print self.async_result.state
print self.async_result.result
print self.async_result.traceback
return Response()
except (PermissionDenied, InvalidArgument, NotFound, KeyError) as ex:
gutils.handle_exceptions(ex)
tasks.py:
from producer_main.celery import app
from utilities.general import upload_class
@app.task
def import_file(path, user):
"""Asynchronously import a course."""
upload_class(path, user)
celery.py:
"""
As described in
http://celery.readthedocs.org/en/latest/django/first-steps-with-django.html
"""
from __future__ import absolute_import
import os
import logging
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'producer_main.settings')
from django.conf import settings
log = logging.getLogger(__name__)
app = Celery('producer') # pylint: disable=invalid-name
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) # pragma: no cover
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
我的后端是这样配置的:
CELERY_ALWAYS_EAGER = True
CELERY_EAGER_PROPAGATES_EXCEPTIONS = False
BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'redis://localhost'
CELERY_RESULT_PERSISTENT = True
CELERY_IGNORE_RESULT = False
当我 运行 对 link_error 状态进行单元测试时,我得到:
Creating test database for alias 'default'...
<class 'celery.local.PromiseProxy'>
130ccf13-c2a0-4bde-8d49-e17eeb1b0115
<celery.backends.redis.RedisBackend object at 0x10aa2e110>
PENDING
None
None
results from self.async_result:
130ccf13-c2a0-4bde-8d49-e17eeb1b0115
None
FAILURE
Non .zip / .tar.gz file passed in.
Traceback (most recent call last):
所以任务结果在我的 _upload_error_simple()
方法中不可用,但它们可从 self.async_result
返回的变量中获得...
您似乎将 _upload_error()
作为 class 的绑定方法 - 这可能不是您想要的。试着让它成为一个独立的任务:
@celery_app.task(bind=True)
def _upload_error(self, uuid):
result = celery_app.AsyncResult(uuid)
msg = 'Task {0} raised exception: {1!r}\n{2!r}'.format(uuid,
result.result,
result.traceback)
class Whatever(object):
....
self.async_result = import_file.apply_async((self.path, request.user),
link=self._upload_success.s(
"Upload finished."),
link_error=_upload_error.s())
实际上不需要 self
参数,因为它没有被使用,所以你可以这样做:
@celery_app.task()
def _upload_error(uuid):
result = celery_app.AsyncResult(uuid)
msg = 'Task {0} raised exception: {1!r}\n{2!r}'.format(uuid,
result.result,
result.traceback)
请注意缺少 bind=True
和 self
我无法使 link
和 link_error
回调正常工作,因此我最终不得不使用 [=16] 中描述的 on_failure
和 on_success
任务方法=].我的 tasks.py
看起来像:
class ErrorHandlingTask(Task):
abstract = True
def on_failure(self, exc, task_id, targs, tkwargs, einfo):
msg = 'Import of {0} raised exception: {1!r}'.format(targs[0].split('/')[-1],
str(exc))
def on_success(self, retval, task_id, targs, tkwargs):
msg = "Upload successful. You may now view your course."
@app.task(base=ErrorHandlingTask)
def import_file(path, user):
"""Asynchronously import a course."""
upload_class(path, user)
小心 UUID
个实例!
如果您尝试获取 ID 不是 string
类型而是 UUID
类型的任务的状态,您将只会获得 PENDING
状态。
from uuid import UUID
from celery.result import AsyncResult
task_id = UUID('d4337c01-4402-48e9-9e9c-6e9919d5e282')
print(AsyncResult(task_id).state)
# PENDING
print(AsyncResult(str(task_id)).state)
# SUCCESS