在 Celery 任务之间共享一个通用的效用函数

Share a common utility function between Celery tasks

我在 Celery 中有很多任务都使用 canvas chain 连接起来。

@shared_task(bind=True)
def my_task_A(self):
    try:
        logger.debug('running task A')
        do something
    except Exception:
        run common cleanup function

@shared_task(bind=True)
def my_task_B(self):
    try:
        logger.debug('running task B')
        do something else
    except Exception:
        run common cleanup function

...

到目前为止一切顺利。问题是我正在寻找使用像这样的通用实用函数的最佳实践:

def cleanup_and_notify_user(task_data):
    logger.debug('task failed')
    send email
    delete folders
    ...

没有任务阻塞的最佳方法是什么? 例如,我可以将 run common cleanup function 替换为对 cleanup_and_notify_user(task_data) 的调用吗?如果来自多个 worker 的多个任务试图同时调用该函数,会发生什么情况?

每个工人都有自己的副本吗?我显然对这里的几个概念有点困惑。非常感谢任何帮助。

提前谢谢大家。

您只是在 celery 任务中编写代码 python 因此该任务有自己的进程,并且该函数将为每个任务实例化,就像在任何基本的 OOP 逻辑中一样。 当然,如果此清理功能尝试删除系统文件夹或数据库行等共享资源,您最终会遇到需要以其他方式解决的外部资源的并发访问问题,例如,在 fylesystem 的情况下,您可以为每个任务创建一个沙箱. 希望这有帮助