Python 多线程进入 Celery 任务。 celery_task.update_state() 错误
Python multithreading into Celery task. celery_task.update_state() error
我正在尝试在 Celery 任务中实现线程池。
我的 Celery 任务调用 update_state() 函数将有关任务状态的信息发送到数据库。它成功地工作。
但是当我将线程添加到任务中并尝试在每个线程中调用 update_state() 函数时 - Celery returns 一个错误。
这是工作示例(没有线程):
import celery
@celery.task(bind=True)
def get_info(self, user):
for i in xrange(4):
self.update_state(state=states.SUCCESS, meta={'subtask_id': i})
这不是工作示例(使用线程):
import celery
import threading
lock = threading.Lock()
def run_subtask(celery_task, i):
lock.acquire()
#Error raises here, when update_state calls
celery_task.update_state(state=states.SUCCESS, meta={'subtask_id': i})
lock.release()
@celery.task(bind=True)
def get_info(self, user):
for i in xrange(4):
worker = threading.Thread(target=run_subtask, args=(self, i))
worker.start()
错误是:
[2017-03-04 10:48:45,273: WARNING/PoolWorker-1] File "/usr/local/lib/python3.4/dist-packages/celery/backends/base.py",
line 558, in get_key_for_task self.task_keyprefix, key_t(task_id), key_t(key),
[2017-03-04 10:48:45,274: WARNING/PoolWorker-1] TypeError: sequence item 1: expected a bytes-like object, NoneType found
这是什么原因?为什么我不能将 update_state() 调用到线程中?
Celery 向线程添加了一种上下文对象,因此它知道它与哪个任务相关。为了将线程与任务相关联,您需要执行以下操作:
from celery.app import push_current_task
def run_subtask(celery_task, i):
push_current_task(celery_task)
...
pop_current_task()
我找到答案了!这是一位 Celery 贡献者的回答:
task.request是线程局部的,所以只有执行任务的线程才能调用update_state.
如果您认为线程可能与存储结果的任务 post 处理程序竞争,这尤其有意义。
您可以将 task_id 传递给线程:
cp_self.update_state(task_id=task_id, state='PROGRESS', meta={'timeout': to})
但是您必须确保线程在任务退出之前已加入并停止 (thread.join())。
在您的示例中,线程只能在 while 循环退出后加入,并且由于您正在休眠 1 秒,加入可能会延迟同样多。
我正在尝试在 Celery 任务中实现线程池。
我的 Celery 任务调用 update_state() 函数将有关任务状态的信息发送到数据库。它成功地工作。 但是当我将线程添加到任务中并尝试在每个线程中调用 update_state() 函数时 - Celery returns 一个错误。
这是工作示例(没有线程):
import celery
@celery.task(bind=True)
def get_info(self, user):
for i in xrange(4):
self.update_state(state=states.SUCCESS, meta={'subtask_id': i})
这不是工作示例(使用线程):
import celery
import threading
lock = threading.Lock()
def run_subtask(celery_task, i):
lock.acquire()
#Error raises here, when update_state calls
celery_task.update_state(state=states.SUCCESS, meta={'subtask_id': i})
lock.release()
@celery.task(bind=True)
def get_info(self, user):
for i in xrange(4):
worker = threading.Thread(target=run_subtask, args=(self, i))
worker.start()
错误是:
[2017-03-04 10:48:45,273: WARNING/PoolWorker-1] File "/usr/local/lib/python3.4/dist-packages/celery/backends/base.py",
line 558, in get_key_for_task self.task_keyprefix, key_t(task_id), key_t(key),
[2017-03-04 10:48:45,274: WARNING/PoolWorker-1] TypeError: sequence item 1: expected a bytes-like object, NoneType found
这是什么原因?为什么我不能将 update_state() 调用到线程中?
Celery 向线程添加了一种上下文对象,因此它知道它与哪个任务相关。为了将线程与任务相关联,您需要执行以下操作:
from celery.app import push_current_task
def run_subtask(celery_task, i):
push_current_task(celery_task)
...
pop_current_task()
我找到答案了!这是一位 Celery 贡献者的回答:
task.request是线程局部的,所以只有执行任务的线程才能调用update_state.
如果您认为线程可能与存储结果的任务 post 处理程序竞争,这尤其有意义。
您可以将 task_id 传递给线程:
cp_self.update_state(task_id=task_id, state='PROGRESS', meta={'timeout': to})
但是您必须确保线程在任务退出之前已加入并停止 (thread.join())。 在您的示例中,线程只能在 while 循环退出后加入,并且由于您正在休眠 1 秒,加入可能会延迟同样多。