在将 Celery 任务发送给远程工作者之前,如何从 Django 数据库中检索数据?
How do I retrieve data from a Django DB before sending off Celery task to remote worker?
我有一个芹菜 shared_task
,计划在特定时间间隔 运行。这个任务每次都是运行,需要先从Django DB中获取数据才能完成计算。此任务可能会也可能不会发送到单独机器上的 celery worker,因此在 celery 任务中我无法对本地 celery 数据库进行任何查询。
到目前为止,我已经尝试使用信号来完成它,因为我知道带有包装器 @before_task_publish
的函数甚至在任务发布到消息队列之前就已执行。但是,我不知道如何才能真正将数据获取到任务中。
@shared_task
def run_computation(data):
perform_computation(data)
@before_task_publish.connect
def receiver_before_task_publish(sender=None, headers=None, body=None, **kwargs):
data = create_data()
# How do I get the data to the task from here?
首先,这是解决这个问题的正确方法吗?或者我会更好地制作 celery 任务可以检索数据的 API 路线吗?
可以从 before_task_publish
处理程序就地修改任务数据,以便将其传递给任务。我会先说 many reasons 为什么这不是一个好主意:
@before_task_publish.connect
def receiver_before_task_publish(sender=None, headers=None, body=None, **kwargs):
data = create_data()
# Modify the body of the task data.
# Body is a tuple, the first entry of which is a tuple of arguments to the task.
# So we replace the first argument (data) with our own.
body[0][0] = data
# Alternatively modify the kwargs, which is a bit more explicit
body[1]['data'] = data
这行得通,但应该很明显为什么它有风险并且容易破损。假设您可以控制任务调用站点,我认为最好完全放弃信号并只使用一个简单的函数来为您完成工作,即:
def create_task(data):
data = create_data()
run_computation.delay(data)
然后在您的调用代码中,只需调用 create_task(data)
而不是直接调用任务(这可能是您现在正在做的)。
我正在发布对我有用的解决方案,感谢@solarissmoke 的帮助。
对我来说最有效的是利用 Celery“链”回调函数和单独的 RabbitMQ 队列来指定本地计算的内容和远程工作人员计算的内容。
我的解决方案看起来像这样:
@app.task
def create_data_task():
# this creates the data to be passed to the analysis function
return create_data()
@app.task
def perform_computation_task(data):
# This performs the computation with given data
return perform_computation(data)
@app.task
def store_results(result):
# This would store the result in the DB here, but for now we just print it
print(result)
@app.task
def run_all_computation():
task = signature("path.to.tasks.create_data_task", queue="default") | signature("path.to.tasks.perform_computation_task", queue="remote_computation") | signature("path.to.tasks.store_results", queue="default")
task()
重要的是要注意这些任务不是 运行 连续的;它们实际上是由工作人员 运行 独立完成的任务,因此不会阻塞单个线程。其他任务仅由其他任务的回调函数激活。我在 RabbitMQ 中声明了两个 celery 队列,一个名为 default 的默认队列,一个名为“remote_computation”的专门用于远程计算的队列。这已明确描述here,包括如何将 celery worker 指向创建的队列。
我有一个芹菜 shared_task
,计划在特定时间间隔 运行。这个任务每次都是运行,需要先从Django DB中获取数据才能完成计算。此任务可能会也可能不会发送到单独机器上的 celery worker,因此在 celery 任务中我无法对本地 celery 数据库进行任何查询。
到目前为止,我已经尝试使用信号来完成它,因为我知道带有包装器 @before_task_publish
的函数甚至在任务发布到消息队列之前就已执行。但是,我不知道如何才能真正将数据获取到任务中。
@shared_task
def run_computation(data):
perform_computation(data)
@before_task_publish.connect
def receiver_before_task_publish(sender=None, headers=None, body=None, **kwargs):
data = create_data()
# How do I get the data to the task from here?
首先,这是解决这个问题的正确方法吗?或者我会更好地制作 celery 任务可以检索数据的 API 路线吗?
可以从 before_task_publish
处理程序就地修改任务数据,以便将其传递给任务。我会先说 many reasons 为什么这不是一个好主意:
@before_task_publish.connect
def receiver_before_task_publish(sender=None, headers=None, body=None, **kwargs):
data = create_data()
# Modify the body of the task data.
# Body is a tuple, the first entry of which is a tuple of arguments to the task.
# So we replace the first argument (data) with our own.
body[0][0] = data
# Alternatively modify the kwargs, which is a bit more explicit
body[1]['data'] = data
这行得通,但应该很明显为什么它有风险并且容易破损。假设您可以控制任务调用站点,我认为最好完全放弃信号并只使用一个简单的函数来为您完成工作,即:
def create_task(data):
data = create_data()
run_computation.delay(data)
然后在您的调用代码中,只需调用 create_task(data)
而不是直接调用任务(这可能是您现在正在做的)。
我正在发布对我有用的解决方案,感谢@solarissmoke 的帮助。 对我来说最有效的是利用 Celery“链”回调函数和单独的 RabbitMQ 队列来指定本地计算的内容和远程工作人员计算的内容。 我的解决方案看起来像这样:
@app.task
def create_data_task():
# this creates the data to be passed to the analysis function
return create_data()
@app.task
def perform_computation_task(data):
# This performs the computation with given data
return perform_computation(data)
@app.task
def store_results(result):
# This would store the result in the DB here, but for now we just print it
print(result)
@app.task
def run_all_computation():
task = signature("path.to.tasks.create_data_task", queue="default") | signature("path.to.tasks.perform_computation_task", queue="remote_computation") | signature("path.to.tasks.store_results", queue="default")
task()
重要的是要注意这些任务不是 运行 连续的;它们实际上是由工作人员 运行 独立完成的任务,因此不会阻塞单个线程。其他任务仅由其他任务的回调函数激活。我在 RabbitMQ 中声明了两个 celery 队列,一个名为 default 的默认队列,一个名为“remote_computation”的专门用于远程计算的队列。这已明确描述here,包括如何将 celery worker 指向创建的队列。