检查是否在芹菜任务中

Check if in celery task

如何检查 celery 执行的函数?

def notification():
   # in_celery() returns True if called from celery_test(), 
   #                     False if called from not_celery_test()
   if in_celery():
      # Send mail directly without creation of additional celery subtask
      ...
   else:
      # Send mail with creation of celery task
      ...

@celery.task()
def celery_test():
    notification()

def not_celery_test():
    notification()

这是使用 celery.current_task 的一种方法。这是任务要使用的代码:

def notification():
    from celery import current_task
    if not current_task:
        print "directly called"
    elif current_task.request.id is None:
        print "called synchronously"
    else:
        print "dispatched"

@app.task
def notify():
    notification()

这是您可以 运行 练习上面的代码:

        from core.tasks import notify, notification
        print "DIRECT"
        notification()
        print "NOT DISPATCHED"
        notify()
        print "DISPATCHED"
        notify.delay().get()

我在第一个片段中的任务代码位于名为 core.tasks 的模块中。我将代码推送到自定义 Django 管理命令的最后一个片段中。这测试了 3 个案例:

  • 直接调用notification

  • 通过同步执行的任务调用notification。也就是说,这个任务不是通过 Celery 分发给 worker 的。任务的代码在调用 notify.

  • 的同一进程中执行
  • 通过工作人员 运行 的任务调用 notification。任务代码在与启动它的进程不同的进程中执行。

输出是:

NOT DISPATCHED
called synchronously
DISPATCHED
DIRECT
directly called

DISPATCHED 之后的输出任务中没有来自 print 的行,因为该行最终出现在工作日志中:

[2015-12-17 07:23:57,527: WARNING/Worker-4] dispatched

重要说明:我最初在第一个测试中使用 if current_task is None 但它没有用。我检查并重新检查。 Celery 以某种方式将 current_task 设置为一个看起来像 None 的对象(如果你在其上使用 repr,你会得到 None)但不是 None。不确定那里发生了什么。使用 if not current_task 有效。

此外,我已经在 Django 应用程序中测试了上面的代码,但我没有在生产中使用它。可能有我不知道的陷阱。