查询任务状态——Celery和redis
Query task state - Celery & redis
好吧,我有一个我认为相对简单的问题,就像我正在用它撞墙一样。我有一个烧瓶应用程序和一个网页,允许您使用 celery & redis(broker) 在服务器端 运行 一些脚本。
我想做的就是当我开始一个任务时给它一个name/id(任务将被描绘成客户端的一个按钮)即
@app.route('/start_upgrade/<task_name>')
def start_upgrade(task_name):
example_task.delay(1, 2, task_name=task_name)
然后在任务开始后我想看看任务是否在一个单独的请求中 running/waiting/finished,最好像;
@app.route('/check_upgrade_status/<task_name>')
def get_task_status(task_name):
task = celery.get_task_by_name(task_name)
task_state = task.state
return task_state # pseudocode
但我在文档中找不到类似的内容。我对芹菜很陌生,但仅供参考,所以假设我一无所知。另外为了更加明显,我需要能够从 python 查询任务状态,请不要使用 CLI 命令。
也欢迎使用任何替代方法来实现我查询队列的目标。
当您使用 delay
或 apply_async
开始任务时,会创建一个对象 AsyncResult
并包含任务的 ID。要获得它,您只需将它存储在一个变量中。
例如
@app.route('/start_upgrade/<task_name>')
def start_upgrade(task_name):
res = example_task.delay(1, 2, task_name=task_name)
print res.id
您可以存储此 ID,并可能将其与数据库中的其他内容相关联(或者像我在示例中所做的那样打印它)。
然后您可以在 python 控制台中检查您的任务状态:
from celery.result import AsyncResult
AsyncResult(your_task_id).status
看看结果文档,你应该在那里得到你需要的东西:http://docs.celeryproject.org/en/latest/reference/celery.result.html
我最终从亚瑟 post 那里找到了解决我问题的方法。
结合redis我创建了这些函数
import redis
from celery.result import AsyncResult
redis_cache = redis.StrictRedis(host='localhost', port=6379, db=0)
def check_task_status(task_name):
task_id = redis_cache.get(task_name)
return AsyncResult(task_id).status
def start_task(task, task_name, *args, **kwargs):
response = task.delay(*args, **kwargs)
redis_cache.set(task_name, response.id)
这让我可以为任务定义特定的名称。请注意,我还没有真正测试过这个,但它是有道理的。
用法示例;
start_task(example_task, "example_name", 1, 2)
好吧,我有一个我认为相对简单的问题,就像我正在用它撞墙一样。我有一个烧瓶应用程序和一个网页,允许您使用 celery & redis(broker) 在服务器端 运行 一些脚本。
我想做的就是当我开始一个任务时给它一个name/id(任务将被描绘成客户端的一个按钮)即
@app.route('/start_upgrade/<task_name>')
def start_upgrade(task_name):
example_task.delay(1, 2, task_name=task_name)
然后在任务开始后我想看看任务是否在一个单独的请求中 running/waiting/finished,最好像;
@app.route('/check_upgrade_status/<task_name>')
def get_task_status(task_name):
task = celery.get_task_by_name(task_name)
task_state = task.state
return task_state # pseudocode
但我在文档中找不到类似的内容。我对芹菜很陌生,但仅供参考,所以假设我一无所知。另外为了更加明显,我需要能够从 python 查询任务状态,请不要使用 CLI 命令。
也欢迎使用任何替代方法来实现我查询队列的目标。
当您使用 delay
或 apply_async
开始任务时,会创建一个对象 AsyncResult
并包含任务的 ID。要获得它,您只需将它存储在一个变量中。
例如
@app.route('/start_upgrade/<task_name>')
def start_upgrade(task_name):
res = example_task.delay(1, 2, task_name=task_name)
print res.id
您可以存储此 ID,并可能将其与数据库中的其他内容相关联(或者像我在示例中所做的那样打印它)。
然后您可以在 python 控制台中检查您的任务状态:
from celery.result import AsyncResult
AsyncResult(your_task_id).status
看看结果文档,你应该在那里得到你需要的东西:http://docs.celeryproject.org/en/latest/reference/celery.result.html
我最终从亚瑟 post 那里找到了解决我问题的方法。
结合redis我创建了这些函数
import redis
from celery.result import AsyncResult
redis_cache = redis.StrictRedis(host='localhost', port=6379, db=0)
def check_task_status(task_name):
task_id = redis_cache.get(task_name)
return AsyncResult(task_id).status
def start_task(task, task_name, *args, **kwargs):
response = task.delay(*args, **kwargs)
redis_cache.set(task_name, response.id)
这让我可以为任务定义特定的名称。请注意,我还没有真正测试过这个,但它是有道理的。
用法示例;
start_task(example_task, "example_name", 1, 2)