用户定义的 celery 任务 class:在导入期间调用 init
User defined celery task class : init is getting called during import
我正在尝试将 celery 任务用作 class 并查看以下行为。我想我错过了什么。让我先告诉你我想要达到的目标:
1. 创建一个 class 及其初始化函数,芹菜只会调用一次。这将为我的 class 设置所需的参数。我要在这里创建一个线程池。
2. 在生产者中创建这个 celery 任务对象的实例并将作业放入其中。
为了达到同样的效果,我尝试了 celery 网站上提到的简单示例并创建了一个示例 class。我正在使用 :
创建任务
celery -c 1 -A proj worker --loglevel=debug
一开始它似乎在工作,但后来我观察到任务的初始化在 tester.py 中的导入时被调用,我可以通过传递标志来停止对象使用中的这个初始化,但导入期间的初始化是真实的关注这里。
你能指出这个例子的正确用法吗?我不希望 init of task class 被调用的次数超过我使用 celery 命令调用的次数。在现实生活中,它会创建不必要的线程。
此外,如果可能,请指出最接近我上述要求的示例。
celery.py
from __future__ import absolute_import
from celery import Celery
app = Celery('proj',
broker='amqp://',
backend='amqp://',
include=['proj.tasks'])
# Optional configuration, see the application user guide.
app.conf.update(
CELERY_TASK_RESULT_EXPIRES=3600,
)
if __name__ == '__main__':
app.start()
tasks.py
from __future__ import absolute_import
from proj.celery import app
class NaiveAuthenticateServer(app.Task):
def __init__(self, celeryInst = 1):
if celeryInst == 1:
print "Hi, I am celery instant"
else :
print "Did you invoke me from command"
self.users = {'george': 'password'}
def run(self, username, password):
try:
return self.users[username] == password
except KeyError:
return False
tester.py
from proj import tasks
obj = tasks.NaiveAuthenticateServer(0)
res = obj.delay('hi', 'hello')
print res.get()
o/p 共 tester.py
大家好,我是celery instant
你从命令中调用我了吗
错误
您不应该自己创建任务实例 class,而是让 celery 在进程启动时自动为您创建。
因此,您需要定义一个使用基class:
的任务函数
@app.task(base=NaiveAuthenticateServer)
def my_task(arg1, arg2):
print arg1, arg2
然后像这样提交任务:
from proj import tasks
tasks.my_task.delay('hi', 'hello')
我正在尝试将 celery 任务用作 class 并查看以下行为。我想我错过了什么。让我先告诉你我想要达到的目标:
1. 创建一个 class 及其初始化函数,芹菜只会调用一次。这将为我的 class 设置所需的参数。我要在这里创建一个线程池。
2. 在生产者中创建这个 celery 任务对象的实例并将作业放入其中。
为了达到同样的效果,我尝试了 celery 网站上提到的简单示例并创建了一个示例 class。我正在使用 :
创建任务celery -c 1 -A proj worker --loglevel=debug
一开始它似乎在工作,但后来我观察到任务的初始化在 tester.py 中的导入时被调用,我可以通过传递标志来停止对象使用中的这个初始化,但导入期间的初始化是真实的关注这里。
你能指出这个例子的正确用法吗?我不希望 init of task class 被调用的次数超过我使用 celery 命令调用的次数。在现实生活中,它会创建不必要的线程。
此外,如果可能,请指出最接近我上述要求的示例。
celery.py
from __future__ import absolute_import
from celery import Celery
app = Celery('proj',
broker='amqp://',
backend='amqp://',
include=['proj.tasks'])
# Optional configuration, see the application user guide.
app.conf.update(
CELERY_TASK_RESULT_EXPIRES=3600,
)
if __name__ == '__main__':
app.start()
tasks.py
from __future__ import absolute_import
from proj.celery import app
class NaiveAuthenticateServer(app.Task):
def __init__(self, celeryInst = 1):
if celeryInst == 1:
print "Hi, I am celery instant"
else :
print "Did you invoke me from command"
self.users = {'george': 'password'}
def run(self, username, password):
try:
return self.users[username] == password
except KeyError:
return False
tester.py
from proj import tasks
obj = tasks.NaiveAuthenticateServer(0)
res = obj.delay('hi', 'hello')
print res.get()
o/p 共 tester.py
大家好,我是celery instant
你从命令中调用我了吗
错误
您不应该自己创建任务实例 class,而是让 celery 在进程启动时自动为您创建。
因此,您需要定义一个使用基class:
的任务函数@app.task(base=NaiveAuthenticateServer)
def my_task(arg1, arg2):
print arg1, arg2
然后像这样提交任务:
from proj import tasks
tasks.my_task.delay('hi', 'hello')