使用导入的装饰器包装 app.task 函数时出现 Celery KeyError;仅导入错误

Celery KeyError when wrapping app.task function with imported decorator; errors only w/ import

给定布局:

background \
    tasks  \
        __init__.py
        generic.py
        helpers.py
    __init__.py
    _server.py
    config.py
    router.py
    server.py

并用 celery -A background._server worker

启动 _server.py

当我尝试使用 .delay(..)

调用 generic.adder 函数时,我在 Worker 中得到了一个 KeyError: u'generic.adder'

加法函数:

文件generic.py

from background.server import app
from background.tasks.helpers import standardized_task

@standardized_task(app, name='generic.adder')
def adder(x, y):
    return x + y

..被一个函数包装,该函数采用 app 实例并将 Celery Task 的 input/output 标准化为 JSON 对象,该对象 returns 结果和功能。 (包含在下面)但是,问题是 当这个包装函数与 generic.adder 在同一个文件中时,它可以完美地工作——当它被导入并用作上面它抛出关键错误.

我被引导相信包装器正在以某种方式修改传递给 app.taskname=.. 属性,函数名称来自 helpers.py,这导致 generic.adder 从任务访问时找不到。

同样重要的是要注意,如果您尝试从 _server.py(来自 celery CLI 的模块 运行)内部调用 adder(..),它会完美运行;只有在通过分布式接口调用时才会抛出错误;意思是,导入工作独立于 Celery。

文件helpers.py

__author__ = 'Blake'

import types

JSON_TYPES = [
    dict, list, unicode, str, int, long, float, bool, types.NoneType
]

def standardized_task(app, *args, **kwargs):
    def wrapped_task(fn):
        def wrapped_fn(*fnargs, **fnkwargs):
            throws = fnkwargs.get('throws', Exception)
            raises = fnkwargs.get('raises', False)

            if not hasattr(throws, '__call__') and not isinstance(throws(), Exception):
                raise ValueError('throws value not of type Exception: %s' % type(throws))

            result, error = None, None

            try:
                result = fn(*fnargs, **fnkwargs)

                if type(result) not in JSON_TYPES:
                    result = unicode(result)

            except throws, e:
                error = e

                if raises:
                    raise
            finally:
                return {
                    'result': result,
                    'error': str(error) if error else None,
                    'meta': {
                        'args': fnargs, 'kwargs': fnkwargs
                    }
                }

        return app.task(wrapped_fn, *args, **kwargs)
    return wrapped_task

文件_server.py

from background.server import app
from background.tasks.generic import *

答案不是使用装饰器,而是将celery.Task扩展为抽象class并使用,@app.task(name='...', base=MyNewAbstractTask)

下面的 SO post 解释得更好:

celery task and customize decorator

import types

JSON_TYPES = [
    dict, list, unicode, str, int, long, float, bool, types.NoneType
]

class StandardizedTask(Task):
    abstract = True

    def __call__(self, *args, **kwargs):
        return self.inner_run(*args, **kwargs)

    def inner_run(self, *args, **kwargs):
        throws = kwargs.get('throws', Exception)
        raises = kwargs.get('raises', False)

        if not hasattr(throws, '__call__') and not isinstance(throws(), Exception):
            raise ValueError('throws value not of type Exception: %s' % type(throws))

        result, error = None, None

        try:
            result = self.run(*args, **kwargs)

            if type(result) not in JSON_TYPES:
                result = unicode(result)

        except throws, e:
            error = e

            if raises:
                raise
        finally:
            return {
                'result': result,
                'error': str(error) if error else None,
                'meta': {
                    'args': args, 'kwargs': kwargs }}