使用导入的装饰器包装 app.task 函数时出现 Celery KeyError;仅导入错误
Celery KeyError when wrapping app.task function with imported decorator; errors only w/ import
给定布局:
background \
tasks \
__init__.py
generic.py
helpers.py
__init__.py
_server.py
config.py
router.py
server.py
并用 celery -A background._server worker
启动 _server.py
当我尝试使用 .delay(..)
调用 generic.adder
函数时,我在 Worker 中得到了一个 KeyError: u'generic.adder'
加法函数:
文件generic.py
from background.server import app
from background.tasks.helpers import standardized_task
@standardized_task(app, name='generic.adder')
def adder(x, y):
return x + y
..被一个函数包装,该函数采用 app
实例并将 Celery Task 的 input/output 标准化为 JSON 对象,该对象 returns 结果和功能。 (包含在下面)但是,问题是 当这个包装函数与 generic.adder 在同一个文件中时,它可以完美地工作——当它被导入并用作上面它抛出关键错误.
我被引导相信包装器正在以某种方式修改传递给 app.task
的 name=..
属性,函数名称来自 helpers.py
,这导致 generic.adder
从任务访问时找不到。
同样重要的是要注意,如果您尝试从 _server.py
(来自 celery CLI 的模块 运行)内部调用 adder(..)
,它会完美运行;只有在通过分布式接口调用时才会抛出错误;意思是,导入工作独立于 Celery。
文件helpers.py
__author__ = 'Blake'
import types
JSON_TYPES = [
dict, list, unicode, str, int, long, float, bool, types.NoneType
]
def standardized_task(app, *args, **kwargs):
def wrapped_task(fn):
def wrapped_fn(*fnargs, **fnkwargs):
throws = fnkwargs.get('throws', Exception)
raises = fnkwargs.get('raises', False)
if not hasattr(throws, '__call__') and not isinstance(throws(), Exception):
raise ValueError('throws value not of type Exception: %s' % type(throws))
result, error = None, None
try:
result = fn(*fnargs, **fnkwargs)
if type(result) not in JSON_TYPES:
result = unicode(result)
except throws, e:
error = e
if raises:
raise
finally:
return {
'result': result,
'error': str(error) if error else None,
'meta': {
'args': fnargs, 'kwargs': fnkwargs
}
}
return app.task(wrapped_fn, *args, **kwargs)
return wrapped_task
文件_server.py
from background.server import app
from background.tasks.generic import *
答案不是使用装饰器,而是将celery.Task扩展为抽象class并使用,@app.task(name='...', base=MyNewAbstractTask)
下面的 SO post 解释得更好:
celery task and customize decorator
import types
JSON_TYPES = [
dict, list, unicode, str, int, long, float, bool, types.NoneType
]
class StandardizedTask(Task):
abstract = True
def __call__(self, *args, **kwargs):
return self.inner_run(*args, **kwargs)
def inner_run(self, *args, **kwargs):
throws = kwargs.get('throws', Exception)
raises = kwargs.get('raises', False)
if not hasattr(throws, '__call__') and not isinstance(throws(), Exception):
raise ValueError('throws value not of type Exception: %s' % type(throws))
result, error = None, None
try:
result = self.run(*args, **kwargs)
if type(result) not in JSON_TYPES:
result = unicode(result)
except throws, e:
error = e
if raises:
raise
finally:
return {
'result': result,
'error': str(error) if error else None,
'meta': {
'args': args, 'kwargs': kwargs }}
给定布局:
background \
tasks \
__init__.py
generic.py
helpers.py
__init__.py
_server.py
config.py
router.py
server.py
并用 celery -A background._server worker
_server.py
当我尝试使用 .delay(..)
generic.adder
函数时,我在 Worker 中得到了一个 KeyError: u'generic.adder'
加法函数:
文件generic.py
from background.server import app
from background.tasks.helpers import standardized_task
@standardized_task(app, name='generic.adder')
def adder(x, y):
return x + y
..被一个函数包装,该函数采用 app
实例并将 Celery Task 的 input/output 标准化为 JSON 对象,该对象 returns 结果和功能。 (包含在下面)但是,问题是 当这个包装函数与 generic.adder 在同一个文件中时,它可以完美地工作——当它被导入并用作上面它抛出关键错误.
我被引导相信包装器正在以某种方式修改传递给 app.task
的 name=..
属性,函数名称来自 helpers.py
,这导致 generic.adder
从任务访问时找不到。
同样重要的是要注意,如果您尝试从 _server.py
(来自 celery CLI 的模块 运行)内部调用 adder(..)
,它会完美运行;只有在通过分布式接口调用时才会抛出错误;意思是,导入工作独立于 Celery。
文件helpers.py
__author__ = 'Blake'
import types
JSON_TYPES = [
dict, list, unicode, str, int, long, float, bool, types.NoneType
]
def standardized_task(app, *args, **kwargs):
def wrapped_task(fn):
def wrapped_fn(*fnargs, **fnkwargs):
throws = fnkwargs.get('throws', Exception)
raises = fnkwargs.get('raises', False)
if not hasattr(throws, '__call__') and not isinstance(throws(), Exception):
raise ValueError('throws value not of type Exception: %s' % type(throws))
result, error = None, None
try:
result = fn(*fnargs, **fnkwargs)
if type(result) not in JSON_TYPES:
result = unicode(result)
except throws, e:
error = e
if raises:
raise
finally:
return {
'result': result,
'error': str(error) if error else None,
'meta': {
'args': fnargs, 'kwargs': fnkwargs
}
}
return app.task(wrapped_fn, *args, **kwargs)
return wrapped_task
文件_server.py
from background.server import app
from background.tasks.generic import *
答案不是使用装饰器,而是将celery.Task扩展为抽象class并使用,@app.task(name='...', base=MyNewAbstractTask)
下面的 SO post 解释得更好:
celery task and customize decorator
import types
JSON_TYPES = [
dict, list, unicode, str, int, long, float, bool, types.NoneType
]
class StandardizedTask(Task):
abstract = True
def __call__(self, *args, **kwargs):
return self.inner_run(*args, **kwargs)
def inner_run(self, *args, **kwargs):
throws = kwargs.get('throws', Exception)
raises = kwargs.get('raises', False)
if not hasattr(throws, '__call__') and not isinstance(throws(), Exception):
raise ValueError('throws value not of type Exception: %s' % type(throws))
result, error = None, None
try:
result = self.run(*args, **kwargs)
if type(result) not in JSON_TYPES:
result = unicode(result)
except throws, e:
error = e
if raises:
raise
finally:
return {
'result': result,
'error': str(error) if error else None,
'meta': {
'args': args, 'kwargs': kwargs }}