在 Celery 中使用 shared_task 时如何在任务执行之前 运行 编写代码

How do I run code before task execution when using a shared_task in Celery

我要求我的所有 Celery 任务都必须使用特定的关键字参数来调用。我想在执行任务之前检查并使用关键字的值。 例如,假设我有以下内容:

@shared_task
def my_task(*args, **kwargs):
    foo = kwargs.get('bar')  # -> I don't want to copy this to all my tasks
    # Do stuff here

我如何创建一个名为 my_special_shared_task 的新装饰器,以便下面等同于上面的内容:

@my_special_shared_task
def my_task(*args, **kwargs):
    # Do stuff here

任务继承呢? 类似于:

class BaseTask(celery.Task):
    foo = "some_value"


@app.task(base=BaseTask)
def my_task(*args, **kwargs):
    # Do stuff here
    print(self.foo)

here 是文档。