Flower 中的高级任务格式化(Celery 监控)

Advanced task formatting in Flower (Celery monitoring)

我使用 Flower 来监控我的 Celery 任务。

我正在尝试更改任务的显示方式(在 Tasks 选项卡下)以使列表看起来更“有条理”。例如,显示 <list (6 items)> 而不是 [1, 2, 3, ....

不幸的是,overriding the format_task method 有局限性:

为了取回原来的 argskwargs 对象,我使用 eval(task.args) 以便之后可以遍历它们的项目。评估随机字符串对我来说有点不安全,你能推荐一个比这样做更好的方法吗?

我找到了一个解决方案,可以将 format_task 中的任务参数作为对象(lists/dicts 等)而不是截断的字符串表示。

apply_async() 方法的 argsreprkwargsrepr 参数允许为任务的参数指定自定义表示。

我创建了一个自定义 Task class,重写了 delay 方法,如下所示:

import json
from celery import Task

class FlowerTask(Task):
    def delay(self, *args, **kwargs):
        argsrepr, kwargsrepr = [], {}

        for arg in args:
            if isinstance(arg, bytes):
                argsrepr.append("<binary content>")
            elif isinstance(arg, list):
                argsrepr.append("<list ({} items)>".format(len(arg)))
            elif isinstance(arg, dict):
                argsrepr.append("<dict ({} keys)>".format(len(arg)))
            else:
                # Format your args the way you prefer
            
        for key, value in kwargs.items():
            # Format your kwargs the same way as above
            # if ... :
            # kwargsrepr.append(...)
        
        # Create the task
        new_task = super().s(*args, **kwargs)

        # Use our representations as JSON
        return new_task.apply_async(
            argsrepr=json.dumps(argsrepr),
            kwargsrepr=json.dumps(kwargsrepr)
        )

然后我使用这个 class 作为我使用 base 参数的任务的基础:

@shared_task(base=FlowerTask)
def test_task(*args, **kwargs):
    return "OK !"

这样,任务的参数表示存储为 JSON,之后可以在 Flower 的 format_task() 中加载,并将它们用作对象而不是字符串:

def format_task(task):
    argsrepr = json.loads(task.args)
    kwargsrepr = json.loads(task.kwargs)

    if not argsrepr:
        task.args = "( )"
    else:
        task.args = ', '.join(argsrepr)

    if not kwargsrepr:
        task.kwargs = "( )"
    else:
        task.kwargs = ', '.join(f'{key} = {value}' for key, value in kwargsrepr.items())
    # [...]

这样,args 显示如下:

Args: <list (3 items)>, <binary content>

Kwargs: callback = <function>, items = <dict (5 items)>