Python 多线程进入 Celery 任务。 celery_task.update_state() 错误

Python multithreading into Celery task. celery_task.update_state() error

我正在尝试在 Celery 任务中实现线程池。

我的 Celery 任务调用 update_state() 函数将有关任务状态的信息发送到数据库。它成功地工作。 但是当我将线程添加到任务中并尝试在每个线程中调用 update_state() 函数时 - Celery returns 一个错误。

这是工作示例(没有线程):

import celery

@celery.task(bind=True)
def get_info(self, user):
    for i in xrange(4):
        self.update_state(state=states.SUCCESS, meta={'subtask_id': i})

这不是工作示例(使用线程):

import celery
import threading

lock = threading.Lock()

def run_subtask(celery_task, i):
    lock.acquire()
    #Error raises here, when update_state calls
    celery_task.update_state(state=states.SUCCESS, meta={'subtask_id': i})
    lock.release()

@celery.task(bind=True)
def get_info(self, user):

    for i in xrange(4):
        worker = threading.Thread(target=run_subtask, args=(self, i))
        worker.start()

错误是:

    [2017-03-04 10:48:45,273: WARNING/PoolWorker-1] File "/usr/local/lib/python3.4/dist-packages/celery/backends/base.py", 
line 558, in get_key_for_task self.task_keyprefix, key_t(task_id), key_t(key), 
    [2017-03-04 10:48:45,274: WARNING/PoolWorker-1] TypeError: sequence item 1: expected a bytes-like object, NoneType found

这是什么原因?为什么我不能将 update_state() 调用到线程中?

Celery 向线程添加了一种上下文对象,因此它知道它与哪个任务相关。为了将线程与任务相关联,您需要执行以下操作:

from celery.app import push_current_task


def run_subtask(celery_task, i):
    push_current_task(celery_task)

    ...

    pop_current_task()

我找到答案了!这是一位 Celery 贡献者的回答:

task.request是线程局部的,所以只有执行任务的线程才能调用update_state.

如果您认为线程可能与存储结果的任务 post 处理程序竞争,这尤其有意义。

您可以将 task_id 传递给线程:

cp_self.update_state(task_id=task_id, state='PROGRESS', meta={'timeout': to})

但是您必须确保线程在任务退出之前已加入并停止 (thread.join())。 在您的示例中,线程只能在 while 循环退出后加入,并且由于您正在休眠 1 秒,加入可能会延迟同样多。