Celery+Docker+Django -- 让任务开始工作
Celery+Docker+Django -- Getting tasks to work
过去一周我一直在尝试学习 Celery,并将其添加到我使用 Django 和 Docker-Compose 的项目中。我很难理解如何让它工作;我的问题是在使用任务时我似乎无法上传到我的数据库来工作。上传功能 insertIntoDatabase
之前在没有任何 Celery 参与的情况下工作正常,但现在上传不起作用。事实上,当我尝试上传时,我的网站太快告诉我上传成功了,但实际上什么都没有上传。
服务器以 [=17=] 启动,这将进行迁移、执行迁移、收集静态文件、更新要求,然后启动服务器。这一切都是使用 pavement.py
完成的; Docker 文件中的命令是 CMD paver docker_run
。 Celery worker 在任何时候都不会明确启动;我应该这样做吗?如果可以,怎么做?
这是我在 views.py
中调用上传函数的方式:
insertIntoDatabase.delay(datapoints, user, description)
上传函数定义在名为databaseinserter.py
的文件中。以下装饰器用于 insertIntoDatabase
:
@shared_task(bind=True, name="database_insert", base=DBTask)
这里是celery.py
中DBTask
class的定义:
class DBTask(Task):
abstract = True
def on_failure(self, exc, *args, **kwargs):
raise exc
我不太确定要为 tasks.py
写什么。这是一位前同事在我从他离开的地方接起之前给我留下的东西:
from celery.decorators import task
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@task(name="database_insert")
def database_insert(data):
这是我用来配置 Celery (settings.py
) 的设置:
BROKER_TRANSPORT = 'redis'
_REDIS_LOCATION = 'redis://{}:{}'.format(os.environ.get("REDIS_PORT_6379_TCP_ADDR"), os.environ.get("REDIS_PORT_6379_TCP_PORT"))
BROKER_URL = _REDIS_LOCATION + '/0'
CELERY_RESULT_BACKEND = _REDIS_LOCATION + '/1'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = "UTC"
现在,我猜想 tasks.py
中的 database_insert
不应该为空,但是应该放什么?此外,似乎 tasks.py
中的任何事情都没有发生——当我添加一些日志语句以查看 tasks.py
是否至少是 运行 时,实际上没有任何内容被记录下来,让我觉得 tasks.py
甚至不是 运行。如何正确地将我的上传功能变成任务?
我认为你离完成这项工作不远了。
首先,我建议您尽量将 Celery 任务和业务逻辑分开。因此,例如,在 insertIntoDatabase
函数中包含将数据插入数据库的业务逻辑,然后单独创建一个 Celery 任务,可能名称为 insert_into_db_task
,这可能很有意义在你的 args 中作为普通 python 对象(重要)并使用这些 args 调用上述 insertIntoDatabase
函数以实际完成数据库插入。
该示例的代码可能如下所示:
my_app/tasks/insert_into_db.py
from celery.decorators import task
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@task()
def insert_into_db_task(datapoints, user, description):
from my_app.services import insertIntoDatabase
insertIntoDatabase(datapoints, user, description)
my_app/services/insertIntoDatabase.py
def insertIntoDatabase(datapoints, user, description):
"""Note that this function is not a task, by design"""
# do db insertion stuff
my_app/views/insert_view.py
from my_app.tasks import insert_into_db_task
def simple_insert_view_func(request, args, kwargs):
# start handling request, define datapoints, user, description
# next line creates the **task** which will later do the db insertion
insert_into_db_task.delay(datapoints, user, description)
return Response(201)
我所暗示的应用程序结构正是我的做法,不是必需的。另请注意,您可以直接使用 @task()
并且不为其定义任何参数。可能会为您简化事情。
有帮助吗?我喜欢让我的任务轻松自如。他们大多只是做防混蛋(例如,确保涉及的对象存在于数据库中),调整任务失败时会发生什么(稍后重试?中止任务?等),日志记录,否则他们执行位于其他地方的业务逻辑。
此外,如果不是很明显,您确实需要在某个地方 运行ning 芹菜,以便有工作人员实际处理您的视图代码正在创建的任务。如果你不在某个地方 运行 celery 那么你的任务只会堆积在队列中并且永远不会被处理(因此你的数据库插入永远不会发生)。
过去一周我一直在尝试学习 Celery,并将其添加到我使用 Django 和 Docker-Compose 的项目中。我很难理解如何让它工作;我的问题是在使用任务时我似乎无法上传到我的数据库来工作。上传功能 insertIntoDatabase
之前在没有任何 Celery 参与的情况下工作正常,但现在上传不起作用。事实上,当我尝试上传时,我的网站太快告诉我上传成功了,但实际上什么都没有上传。
服务器以 [=17=] 启动,这将进行迁移、执行迁移、收集静态文件、更新要求,然后启动服务器。这一切都是使用 pavement.py
完成的; Docker 文件中的命令是 CMD paver docker_run
。 Celery worker 在任何时候都不会明确启动;我应该这样做吗?如果可以,怎么做?
这是我在 views.py
中调用上传函数的方式:
insertIntoDatabase.delay(datapoints, user, description)
上传函数定义在名为databaseinserter.py
的文件中。以下装饰器用于 insertIntoDatabase
:
@shared_task(bind=True, name="database_insert", base=DBTask)
这里是celery.py
中DBTask
class的定义:
class DBTask(Task):
abstract = True
def on_failure(self, exc, *args, **kwargs):
raise exc
我不太确定要为 tasks.py
写什么。这是一位前同事在我从他离开的地方接起之前给我留下的东西:
from celery.decorators import task
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@task(name="database_insert")
def database_insert(data):
这是我用来配置 Celery (settings.py
) 的设置:
BROKER_TRANSPORT = 'redis'
_REDIS_LOCATION = 'redis://{}:{}'.format(os.environ.get("REDIS_PORT_6379_TCP_ADDR"), os.environ.get("REDIS_PORT_6379_TCP_PORT"))
BROKER_URL = _REDIS_LOCATION + '/0'
CELERY_RESULT_BACKEND = _REDIS_LOCATION + '/1'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = "UTC"
现在,我猜想 tasks.py
中的 database_insert
不应该为空,但是应该放什么?此外,似乎 tasks.py
中的任何事情都没有发生——当我添加一些日志语句以查看 tasks.py
是否至少是 运行 时,实际上没有任何内容被记录下来,让我觉得 tasks.py
甚至不是 运行。如何正确地将我的上传功能变成任务?
我认为你离完成这项工作不远了。
首先,我建议您尽量将 Celery 任务和业务逻辑分开。因此,例如,在 insertIntoDatabase
函数中包含将数据插入数据库的业务逻辑,然后单独创建一个 Celery 任务,可能名称为 insert_into_db_task
,这可能很有意义在你的 args 中作为普通 python 对象(重要)并使用这些 args 调用上述 insertIntoDatabase
函数以实际完成数据库插入。
该示例的代码可能如下所示:
my_app/tasks/insert_into_db.py
from celery.decorators import task
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@task()
def insert_into_db_task(datapoints, user, description):
from my_app.services import insertIntoDatabase
insertIntoDatabase(datapoints, user, description)
my_app/services/insertIntoDatabase.py
def insertIntoDatabase(datapoints, user, description):
"""Note that this function is not a task, by design"""
# do db insertion stuff
my_app/views/insert_view.py
from my_app.tasks import insert_into_db_task
def simple_insert_view_func(request, args, kwargs):
# start handling request, define datapoints, user, description
# next line creates the **task** which will later do the db insertion
insert_into_db_task.delay(datapoints, user, description)
return Response(201)
我所暗示的应用程序结构正是我的做法,不是必需的。另请注意,您可以直接使用 @task()
并且不为其定义任何参数。可能会为您简化事情。
有帮助吗?我喜欢让我的任务轻松自如。他们大多只是做防混蛋(例如,确保涉及的对象存在于数据库中),调整任务失败时会发生什么(稍后重试?中止任务?等),日志记录,否则他们执行位于其他地方的业务逻辑。
此外,如果不是很明显,您确实需要在某个地方 运行ning 芹菜,以便有工作人员实际处理您的视图代码正在创建的任务。如果你不在某个地方 运行 celery 那么你的任务只会堆积在队列中并且永远不会被处理(因此你的数据库插入永远不会发生)。