Celery+Docker+Django -- 让任务开始工作

Celery+Docker+Django -- Getting tasks to work

过去一周我一直在尝试学习 Celery,并将其添加到我使用 Django 和 Docker-Compose 的项目中。我很难理解如何让它工作;我的问题是在使用任务时我似乎无法上传到我的数据库来工作。上传功能 insertIntoDatabase 之前在没有任何 Celery 参与的情况下工作正常,但现在上传不起作用。事实上,当我尝试上传时,我的网站太快告诉我上传成功了,但实际上什么都没有上传。

服务器以 [​​=17=] 启动,这将进行迁移、执行迁移、收集静态文件、更新要求,然后启动服务器。这一切都是使用 pavement.py 完成的; Docker 文件中的命令是 CMD paver docker_run。 Celery worker 在任何时候都不会明确启动;我应该这样做吗?如果可以,怎么做?

这是我在 views.py 中调用上传函数的方式:

insertIntoDatabase.delay(datapoints, user, description)

上传函数定义在名为databaseinserter.py 的文件中。以下装饰器用于 insertIntoDatabase:

@shared_task(bind=True, name="database_insert", base=DBTask)

这里是celery.pyDBTaskclass的定义:

class DBTask(Task):
     abstract = True

     def on_failure(self, exc, *args, **kwargs):
        raise exc

我不太确定要为 tasks.py 写什么。这是一位前同事在我从他离开的地方接起之前给我留下的东西:

from celery.decorators import task
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@task(name="database_insert")
def database_insert(data):

这是我用来配置 Celery (settings.py) 的设置:

BROKER_TRANSPORT = 'redis'
_REDIS_LOCATION = 'redis://{}:{}'.format(os.environ.get("REDIS_PORT_6379_TCP_ADDR"), os.environ.get("REDIS_PORT_6379_TCP_PORT"))
BROKER_URL = _REDIS_LOCATION + '/0'
CELERY_RESULT_BACKEND = _REDIS_LOCATION + '/1'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = "UTC"

现在,我猜想 tasks.py 中的 database_insert 不应该为空,但是应该放什么?此外,似乎 tasks.py 中的任何事情都没有发生——当我添加一些日志语句以查看 tasks.py 是否至少是 运行 时,实际上没有任何内容被记录下来,让我觉得 tasks.py 甚至不是 运行。如何正确地将我的上传功能变成任务?

我认为你离完成这项工作不远了。

首先,我建议您尽量将 Celery 任务和业务逻辑分开。因此,例如,在 insertIntoDatabase 函数中包含将数据插入数据库的业务逻辑,然后单独创建一个 Celery 任务,可能名称为 insert_into_db_task,这可能很有意义在你的 args 中作为普通 python 对象(重要)并使用这些 args 调用上述 insertIntoDatabase 函数以实际完成数据库插入。

该示例的代码可能如下所示:

my_app/tasks/insert_into_db.py

from celery.decorators import task
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@task()
def insert_into_db_task(datapoints, user, description):
    from my_app.services import insertIntoDatabase
    insertIntoDatabase(datapoints, user, description)

my_app/services/insertIntoDatabase.py

def insertIntoDatabase(datapoints, user, description):
    """Note that this function is not a task, by design"""

    # do db insertion stuff

my_app/views/insert_view.py

from my_app.tasks import insert_into_db_task

def simple_insert_view_func(request, args, kwargs):
    # start handling request, define datapoints, user, description

    # next line creates the **task** which will later do the db insertion
    insert_into_db_task.delay(datapoints, user, description)
    return Response(201)

我所暗示的应用程序结构正是我的做法,不是必需的。另请注意,您可以直接使用 @task() 并且不为其定义任何参数。可能会为您简化事情。

有帮助吗?我喜欢让我的任务轻松自如。他们大多只是做防混蛋(例如,确保涉及的对象存在于数据库中),调整任务失败时会发生什么(稍后重试?中止任务?等),日志记录,否则他们执行位于其他地方的业务逻辑。

此外,如果不是很明显,您确实需要在某个地方 运行ning 芹菜,以便有工作人员实际处理您的视图代码正在创建的任务。如果你不在某个地方 运行 celery 那么你的任务只会堆积在队列中并且永远不会被处理(因此你的数据库插入永远不会发生)。