用户定义的 celery 任务 class:在导入期间调用 init

User defined celery task class : init is getting called during import

我正在尝试将 celery 任务用作 class 并查看以下行为。我想我错过了什么。让我先告诉你我想要达到的目标:
1. 创建一个 class 及其初始化函数,芹菜只会调用一次。这将为我的 class 设置所需的参数。我要在这里创建一个线程池。
2. 在生产者中创建这个 celery 任务对象的实例并将作业放入其中。

为了达到同样的效果,我尝试了 celery 网站上提到的简单示例并创建了一个示例 class。我正在使用 :

创建任务

celery -c 1 -A proj worker --loglevel=debug

一开始它似乎在工作,但后来我观察到任务的初始化在 tester.py 中的导入时被调用,我可以通过传递标志来停止对象使用中的这个初始化,但导入期间的初始化是真实的关注这里。

你能指出这个例子的正确用法吗?我不希望 init of task class 被调用的次数超过我使用 celery 命令调用的次数。在现实生活中,它会创建不必要的线程。

此外,如果可能,请指出最接近我上述要求的示例。

celery.py

from __future__ import absolute_import
from celery import Celery
app = Celery('proj',
         broker='amqp://',
         backend='amqp://',
         include=['proj.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    CELERY_TASK_RESULT_EXPIRES=3600,
)

if __name__ == '__main__':
    app.start()

tasks.py

from __future__ import absolute_import
from proj.celery import app

class NaiveAuthenticateServer(app.Task):

def __init__(self, celeryInst = 1):
    if celeryInst == 1:
        print "Hi, I am celery instant"
    else :
        print "Did you invoke me from command"
    self.users = {'george': 'password'}

def run(self, username, password):
    try:
        return self.users[username] == password
    except KeyError:
        return False

tester.py

from proj import tasks
obj = tasks.NaiveAuthenticateServer(0)
res = obj.delay('hi', 'hello')
print res.get()

o/p 共 tester.py

大家好,我是celery instant
你从命令中调用我了吗
错误

您不应该自己创建任务实例 class,而是让 celery 在进程启动时自动为您创建。

因此,您需要定义一个使用基class:

的任务函数
@app.task(base=NaiveAuthenticateServer)
def my_task(arg1, arg2):
    print arg1, arg2

然后像这样提交任务:

from proj import tasks
tasks.my_task.delay('hi', 'hello')