如何在气流中为 kubernetes pod 动态构建资源(V1ResourceRequirements)对象

How to dynamically build a resources (V1ResourceRequirements) object for a kubernetes pod in airflow

我目前正在将 DAG 从 airflow 版本 1.10.10 迁移到 2.0.0。

此 DAG 使用自定义 python 运算符,它根据任务的复杂性动态分配资源。 问题是 v1.10.10 中使用的导入 (airflow.contrib.kubernetes.pod import Resources) 不再有效。我读到 v2.0.0 我应该使用 kubernetes.client.models.V1ResourceRequirements,但我需要动态构建此资源对象。 这听起来可能很愚蠢,但我一直无法找到构建此对象的正确方法。

例如,我试过

            self.resources = k8s.V1ResourceRequirements(
                request_memory=get_k8s_resources_mapping(resource_request)['memory'],
                limit_memory=get_k8s_resources_mapping(resource_request)['memory_l'],
                request_cpu=get_k8s_resources_mapping(resource_request)['cpu'],
                limit_cpu=get_k8s_resources_mapping(resource_request)['cpu_l']
            )

            self.resources = k8s.V1ResourceRequirements(
                requests={'cpu': get_k8s_resources_mapping(resource_request)['cpu'],
                          'memory': get_k8s_resources_mapping(resource_request)['memory']},
                limits={'cpu': get_k8s_resources_mapping(resource_request)['cpu_l'],
                        'memory': get_k8s_resources_mapping(resource_request)['memory_l']}
            )

(get_k8s_resources_mapping(resource_request)['xxxx'] 只是 returns 一个取决于 resource_request 的值,例如 '2Gi' 代表内存或 ' 2' cpu)

但它们似乎不起作用。任务失败。

所以,我的问题是,您将如何在 Python 中正确构建 V1ResourceRequirements? 而且,它应该如何看待任务实例的 executor_config 属性?大概是这样的吧?

'resources': {'limits': {'cpu': '1', 'memory': '512Mi'}, 'requests': {'cpu': '1', 'memory': '512Mi'}}

正确的语法例如:

from kubernetes import client
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

KubernetesPodOperator(
    ...,
    resources = client.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "8G"},
        limits={"cpu": "16000m", "memory": "128G"}
    )
)

如果您想动态生成它,只需将 requests/limits 中的值替换为 returns 和 expected string.

的函数即可