Flower 中的高级任务格式化(Celery 监控)
Advanced task formatting in Flower (Celery monitoring)
我使用 Flower 来监控我的 Celery 任务。
我正在尝试更改任务的显示方式(在 Tasks 选项卡下)以使列表看起来更“有条理”。例如,显示 <list (6 items)>
而不是 [1, 2, 3, ...
.
不幸的是,overriding the format_task
method 有局限性:
task.args
和 task.kwargs
是字符串表示,通常被截断,而不是 list/dict
- HTML 对除
task.name
之外的每个字段进行转义
- 如果函数没有 return 值,显示任务时会抛出 AJAX 错误
为了取回原来的 args
和 kwargs
对象,我使用 eval(task.args)
以便之后可以遍历它们的项目。评估随机字符串对我来说有点不安全,你能推荐一个比这样做更好的方法吗?
我找到了一个解决方案,可以将 format_task
中的任务参数作为对象(lists/dicts 等)而不是截断的字符串表示。
apply_async()
方法的 argsrepr
和 kwargsrepr
参数允许为任务的参数指定自定义表示。
我创建了一个自定义 Task class,重写了 delay
方法,如下所示:
import json
from celery import Task
class FlowerTask(Task):
def delay(self, *args, **kwargs):
argsrepr, kwargsrepr = [], {}
for arg in args:
if isinstance(arg, bytes):
argsrepr.append("<binary content>")
elif isinstance(arg, list):
argsrepr.append("<list ({} items)>".format(len(arg)))
elif isinstance(arg, dict):
argsrepr.append("<dict ({} keys)>".format(len(arg)))
else:
# Format your args the way you prefer
for key, value in kwargs.items():
# Format your kwargs the same way as above
# if ... :
# kwargsrepr.append(...)
# Create the task
new_task = super().s(*args, **kwargs)
# Use our representations as JSON
return new_task.apply_async(
argsrepr=json.dumps(argsrepr),
kwargsrepr=json.dumps(kwargsrepr)
)
然后我使用这个 class 作为我使用 base
参数的任务的基础:
@shared_task(base=FlowerTask)
def test_task(*args, **kwargs):
return "OK !"
这样,任务的参数表示存储为 JSON,之后可以在 Flower 的 format_task()
中加载,并将它们用作对象而不是字符串:
def format_task(task):
argsrepr = json.loads(task.args)
kwargsrepr = json.loads(task.kwargs)
if not argsrepr:
task.args = "( )"
else:
task.args = ', '.join(argsrepr)
if not kwargsrepr:
task.kwargs = "( )"
else:
task.kwargs = ', '.join(f'{key} = {value}' for key, value in kwargsrepr.items())
# [...]
这样,args 显示如下:
Args: <list (3 items)>, <binary content>
Kwargs: callback = <function>, items = <dict (5 items)>
我使用 Flower 来监控我的 Celery 任务。
我正在尝试更改任务的显示方式(在 Tasks 选项卡下)以使列表看起来更“有条理”。例如,显示 <list (6 items)>
而不是 [1, 2, 3, ...
.
不幸的是,overriding the format_task
method 有局限性:
task.args
和task.kwargs
是字符串表示,通常被截断,而不是 list/dict- HTML 对除
task.name
之外的每个字段进行转义
- 如果函数没有 return 值,显示任务时会抛出 AJAX 错误
为了取回原来的 args
和 kwargs
对象,我使用 eval(task.args)
以便之后可以遍历它们的项目。评估随机字符串对我来说有点不安全,你能推荐一个比这样做更好的方法吗?
我找到了一个解决方案,可以将 format_task
中的任务参数作为对象(lists/dicts 等)而不是截断的字符串表示。
apply_async()
方法的 argsrepr
和 kwargsrepr
参数允许为任务的参数指定自定义表示。
我创建了一个自定义 Task class,重写了 delay
方法,如下所示:
import json
from celery import Task
class FlowerTask(Task):
def delay(self, *args, **kwargs):
argsrepr, kwargsrepr = [], {}
for arg in args:
if isinstance(arg, bytes):
argsrepr.append("<binary content>")
elif isinstance(arg, list):
argsrepr.append("<list ({} items)>".format(len(arg)))
elif isinstance(arg, dict):
argsrepr.append("<dict ({} keys)>".format(len(arg)))
else:
# Format your args the way you prefer
for key, value in kwargs.items():
# Format your kwargs the same way as above
# if ... :
# kwargsrepr.append(...)
# Create the task
new_task = super().s(*args, **kwargs)
# Use our representations as JSON
return new_task.apply_async(
argsrepr=json.dumps(argsrepr),
kwargsrepr=json.dumps(kwargsrepr)
)
然后我使用这个 class 作为我使用 base
参数的任务的基础:
@shared_task(base=FlowerTask)
def test_task(*args, **kwargs):
return "OK !"
这样,任务的参数表示存储为 JSON,之后可以在 Flower 的 format_task()
中加载,并将它们用作对象而不是字符串:
def format_task(task):
argsrepr = json.loads(task.args)
kwargsrepr = json.loads(task.kwargs)
if not argsrepr:
task.args = "( )"
else:
task.args = ', '.join(argsrepr)
if not kwargsrepr:
task.kwargs = "( )"
else:
task.kwargs = ', '.join(f'{key} = {value}' for key, value in kwargsrepr.items())
# [...]
这样,args 显示如下:
Args:
<list (3 items)>, <binary content>
Kwargs:
callback = <function>, items = <dict (5 items)>