在将 Celery 任务发送给远程工作者之前,如何从 Django 数据库中检索数据?

How do I retrieve data from a Django DB before sending off Celery task to remote worker?

我有一个芹菜 shared_task,计划在特定时间间隔 运行。这个任务每次都是运行,需要先从Django DB中获取数据才能完成计算。此任务可能会也可能不会发送到单独机器上的 celery worker,因此在 celery 任务中我无法对本地 celery 数据库进行任何查询。

到目前为止,我已经尝试使用信号来完成它,因为我知道带有包装器 @before_task_publish 的函数甚至在任务发布到消息队列之前就已执行。但是,我不知道如何才能真正将数据获取到任务中。

@shared_task
def run_computation(data):
    perform_computation(data)


@before_task_publish.connect
def receiver_before_task_publish(sender=None, headers=None, body=None, **kwargs):
   data = create_data()
   # How do I get the data to the task from here?

首先,这是解决这个问题的正确方法吗?或者我会更好地制作 celery 任务可以检索数据的 API 路线吗?

可以从 before_task_publish 处理程序就地修改任务数据,以便将其传递给任务。我会先说 many reasons 为什么这不是一个好主意:

@before_task_publish.connect
def receiver_before_task_publish(sender=None, headers=None, body=None, **kwargs):
    data = create_data()
    # Modify the body of the task data. 
    # Body is a tuple, the first entry of which is a tuple of arguments to the task. 
    # So we replace the first argument (data) with our own.
    body[0][0] = data
    # Alternatively modify the kwargs, which is a bit more explicit
    body[1]['data'] = data

这行得通,但应该很明显为什么它有风险并且容易破损。假设您可以控制任务调用站点,我认为最好完全放弃信号并只使用一个简单的函数来为您完成工作,即:

def create_task(data):
    data = create_data()
    run_computation.delay(data)

然后在您的调用代码中,只需调用 create_task(data) 而不是直接调用任务(这可能是您现在正在做的)。

我正在发布对我有用的解决方案,感谢@solarissmoke 的帮助。 对我来说最有效的是利用 Celery“链”回调函数和单独的 RabbitMQ 队列来指定本地计算的内容和远程工作人员计算的内容。 我的解决方案看起来像这样:

@app.task
def create_data_task():
   # this creates the data to be passed to the analysis function
   return create_data()

@app.task
def perform_computation_task(data):
   # This performs the computation with given data
   return perform_computation(data)

@app.task
def store_results(result):
   # This would store the result in the DB here, but for now we just print it
   print(result)

@app.task
def run_all_computation():
   task = signature("path.to.tasks.create_data_task", queue="default") | signature("path.to.tasks.perform_computation_task", queue="remote_computation") | signature("path.to.tasks.store_results", queue="default") 
   task()

重要的是要注意这些任务不是 运行 连续的;它们实际上是由工作人员 运行 独立完成的任务,因此不会阻塞单个线程。其他任务仅由其他任务的回调函数激活。我在 RabbitMQ 中声明了两个 celery 队列,一个名为 default 的默认队列,一个名为“remote_computation”的专门用于远程计算的队列。这已明确描述here,包括如何将 celery worker 指向创建的队列。