在 Celery 中使用 shared_task 时如何在任务执行之前 运行 编写代码
How do I run code before task execution when using a shared_task in Celery
我要求我的所有 Celery 任务都必须使用特定的关键字参数来调用。我想在执行任务之前检查并使用关键字的值。
例如,假设我有以下内容:
@shared_task
def my_task(*args, **kwargs):
foo = kwargs.get('bar') # -> I don't want to copy this to all my tasks
# Do stuff here
我如何创建一个名为 my_special_shared_task
的新装饰器,以便下面等同于上面的内容:
@my_special_shared_task
def my_task(*args, **kwargs):
# Do stuff here
任务继承呢?
类似于:
class BaseTask(celery.Task):
foo = "some_value"
@app.task(base=BaseTask)
def my_task(*args, **kwargs):
# Do stuff here
print(self.foo)
here 是文档。
我要求我的所有 Celery 任务都必须使用特定的关键字参数来调用。我想在执行任务之前检查并使用关键字的值。 例如,假设我有以下内容:
@shared_task
def my_task(*args, **kwargs):
foo = kwargs.get('bar') # -> I don't want to copy this to all my tasks
# Do stuff here
我如何创建一个名为 my_special_shared_task
的新装饰器,以便下面等同于上面的内容:
@my_special_shared_task
def my_task(*args, **kwargs):
# Do stuff here
任务继承呢? 类似于:
class BaseTask(celery.Task):
foo = "some_value"
@app.task(base=BaseTask)
def my_task(*args, **kwargs):
# Do stuff here
print(self.foo)
here 是文档。