检查是否在芹菜任务中
Check if in celery task
如何检查 celery 执行的函数?
def notification():
# in_celery() returns True if called from celery_test(),
# False if called from not_celery_test()
if in_celery():
# Send mail directly without creation of additional celery subtask
...
else:
# Send mail with creation of celery task
...
@celery.task()
def celery_test():
notification()
def not_celery_test():
notification()
这是使用 celery.current_task
的一种方法。这是任务要使用的代码:
def notification():
from celery import current_task
if not current_task:
print "directly called"
elif current_task.request.id is None:
print "called synchronously"
else:
print "dispatched"
@app.task
def notify():
notification()
这是您可以 运行 练习上面的代码:
from core.tasks import notify, notification
print "DIRECT"
notification()
print "NOT DISPATCHED"
notify()
print "DISPATCHED"
notify.delay().get()
我在第一个片段中的任务代码位于名为 core.tasks
的模块中。我将代码推送到自定义 Django 管理命令的最后一个片段中。这测试了 3 个案例:
直接调用notification
。
通过同步执行的任务调用notification
。也就是说,这个任务不是通过 Celery 分发给 worker 的。任务的代码在调用 notify
.
的同一进程中执行
通过工作人员 运行 的任务调用 notification
。任务代码在与启动它的进程不同的进程中执行。
输出是:
NOT DISPATCHED
called synchronously
DISPATCHED
DIRECT
directly called
DISPATCHED
之后的输出任务中没有来自 print
的行,因为该行最终出现在工作日志中:
[2015-12-17 07:23:57,527: WARNING/Worker-4] dispatched
重要说明:我最初在第一个测试中使用 if current_task is None
但它没有用。我检查并重新检查。 Celery 以某种方式将 current_task
设置为一个看起来像 None
的对象(如果你在其上使用 repr
,你会得到 None
)但不是 None
。不确定那里发生了什么。使用 if not current_task
有效。
此外,我已经在 Django 应用程序中测试了上面的代码,但我没有在生产中使用它。可能有我不知道的陷阱。
如何检查 celery 执行的函数?
def notification():
# in_celery() returns True if called from celery_test(),
# False if called from not_celery_test()
if in_celery():
# Send mail directly without creation of additional celery subtask
...
else:
# Send mail with creation of celery task
...
@celery.task()
def celery_test():
notification()
def not_celery_test():
notification()
这是使用 celery.current_task
的一种方法。这是任务要使用的代码:
def notification():
from celery import current_task
if not current_task:
print "directly called"
elif current_task.request.id is None:
print "called synchronously"
else:
print "dispatched"
@app.task
def notify():
notification()
这是您可以 运行 练习上面的代码:
from core.tasks import notify, notification
print "DIRECT"
notification()
print "NOT DISPATCHED"
notify()
print "DISPATCHED"
notify.delay().get()
我在第一个片段中的任务代码位于名为 core.tasks
的模块中。我将代码推送到自定义 Django 管理命令的最后一个片段中。这测试了 3 个案例:
直接调用
notification
。通过同步执行的任务调用
notification
。也就是说,这个任务不是通过 Celery 分发给 worker 的。任务的代码在调用notify
. 的同一进程中执行
通过工作人员 运行 的任务调用
notification
。任务代码在与启动它的进程不同的进程中执行。
输出是:
NOT DISPATCHED
called synchronously
DISPATCHED
DIRECT
directly called
DISPATCHED
之后的输出任务中没有来自 print
的行,因为该行最终出现在工作日志中:
[2015-12-17 07:23:57,527: WARNING/Worker-4] dispatched
重要说明:我最初在第一个测试中使用 if current_task is None
但它没有用。我检查并重新检查。 Celery 以某种方式将 current_task
设置为一个看起来像 None
的对象(如果你在其上使用 repr
,你会得到 None
)但不是 None
。不确定那里发生了什么。使用 if not current_task
有效。
此外,我已经在 Django 应用程序中测试了上面的代码,但我没有在生产中使用它。可能有我不知道的陷阱。