为 Django 项目中的可重用应用程序创建基于 class 的 Celery 任务

Create class based Celery task for reusable app in Django project

创建基于函数的任务对于 Django 项目来说非常干净。只需在 django 应用程序中创建 tasks.py 并开始编写此示例的任务,该示例取自 http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html

的官方芹菜文档
from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def add(x, y):
    return x + y


@shared_task
def mul(x, y):
    return x * y

但有时基于函数的任务是紧密耦合的,可重用性不高。所以我想创建基于 class 的 celery 任务,该任务记录在官方网站上。在遵循 https://github.com/celery/celery/issues/3874 之后,我可以创建示例任务,但我不确定创建基于 class 的任务的正确方法是否正确。

from __future__ import absolute_import, unicode_literals
from celery import shared_task, Task
import time
from celery import current_app


@shared_task
def add(x, y):
    time.sleep(5)
    return x + y


@shared_task
def mul(x, y):
    return x * y

# Sample class based task for testing
class AnotherTask(current_app.Task):
    name = 'tasks.another_task'
    def __init__(self, x, y):
        self.x = x
        self.y = y

    def run(self):
        time.sleep(5)
        return self.x + self.y
# We need to manually register this class based task    
current_app.tasks.register(AnotherTask(3, 4))

我可以调用此任务,但每次调用结果值都相同

(workflow) alok@alok-MacBookAir:~/exp/expdjango/mysite$ ./manage.py shell
Python 3.6.3 (default, Oct  3 2017, 21:45:48) 
Type "copyright", "credits" or "license" for more information.

IPython 5.1.0 -- An enhanced Interactive Python.
?         -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help      -> Python's own help system.
object?   -> Details about 'object', use 'object??' for extra details.

In [1]: from polls.tasks import *

In [2]: r = AnotherTask(3,4).delay()

In [3]: r.get()
Out[3]: 7

In [4]: r = AnotherTask(5,6).delay()

In [5]: r.get()
Out[5]: 7

创建和调用基于 class 的任务的正确方法吗?

基于

Class 的任务实际上每个运行时只实例化一次。如果您希望您的任务被参数化,请将参数添加到 run 方法:

class AnotherTask(current_app.Task):
    name = 'tasks.another_task'

    def run(self, x, y):
        time.sleep(5)
        return x + y

然后你可以这样称呼它:

r = AnotherTask().delay(3, 4)
# Or
r = AnotherTask().apply_async(args=[3, 4])

这在the docs on class based tasks, and in particular in the instantiation section中也有描述。