如何在气流中为 kubernetes pod 动态构建资源(V1ResourceRequirements)对象
How to dynamically build a resources (V1ResourceRequirements) object for a kubernetes pod in airflow
我目前正在将 DAG 从 airflow 版本 1.10.10 迁移到 2.0.0。
此 DAG 使用自定义 python 运算符,它根据任务的复杂性动态分配资源。
问题是 v1.10.10 中使用的导入 (airflow.contrib.kubernetes.pod import Resources) 不再有效。我读到 v2.0.0 我应该使用 kubernetes.client.models.V1ResourceRequirements,但我需要动态构建此资源对象。
这听起来可能很愚蠢,但我一直无法找到构建此对象的正确方法。
例如,我试过
self.resources = k8s.V1ResourceRequirements(
request_memory=get_k8s_resources_mapping(resource_request)['memory'],
limit_memory=get_k8s_resources_mapping(resource_request)['memory_l'],
request_cpu=get_k8s_resources_mapping(resource_request)['cpu'],
limit_cpu=get_k8s_resources_mapping(resource_request)['cpu_l']
)
或
self.resources = k8s.V1ResourceRequirements(
requests={'cpu': get_k8s_resources_mapping(resource_request)['cpu'],
'memory': get_k8s_resources_mapping(resource_request)['memory']},
limits={'cpu': get_k8s_resources_mapping(resource_request)['cpu_l'],
'memory': get_k8s_resources_mapping(resource_request)['memory_l']}
)
(get_k8s_resources_mapping(resource_request)['xxxx'] 只是 returns 一个取决于 resource_request 的值,例如 '2Gi' 代表内存或 ' 2' cpu)
但它们似乎不起作用。任务失败。
所以,我的问题是,您将如何在 Python 中正确构建 V1ResourceRequirements?
而且,它应该如何看待任务实例的 executor_config 属性?大概是这样的吧?
'resources': {'limits': {'cpu': '1', 'memory': '512Mi'}, 'requests': {'cpu': '1', 'memory': '512Mi'}}
正确的语法例如:
from kubernetes import client
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
KubernetesPodOperator(
...,
resources = client.V1ResourceRequirements(
requests={"cpu": "1000m", "memory": "8G"},
limits={"cpu": "16000m", "memory": "128G"}
)
)
如果您想动态生成它,只需将 requests/limits 中的值替换为 returns 和 expected string.
的函数即可
我目前正在将 DAG 从 airflow 版本 1.10.10 迁移到 2.0.0。
此 DAG 使用自定义 python 运算符,它根据任务的复杂性动态分配资源。 问题是 v1.10.10 中使用的导入 (airflow.contrib.kubernetes.pod import Resources) 不再有效。我读到 v2.0.0 我应该使用 kubernetes.client.models.V1ResourceRequirements,但我需要动态构建此资源对象。 这听起来可能很愚蠢,但我一直无法找到构建此对象的正确方法。
例如,我试过
self.resources = k8s.V1ResourceRequirements(
request_memory=get_k8s_resources_mapping(resource_request)['memory'],
limit_memory=get_k8s_resources_mapping(resource_request)['memory_l'],
request_cpu=get_k8s_resources_mapping(resource_request)['cpu'],
limit_cpu=get_k8s_resources_mapping(resource_request)['cpu_l']
)
或
self.resources = k8s.V1ResourceRequirements(
requests={'cpu': get_k8s_resources_mapping(resource_request)['cpu'],
'memory': get_k8s_resources_mapping(resource_request)['memory']},
limits={'cpu': get_k8s_resources_mapping(resource_request)['cpu_l'],
'memory': get_k8s_resources_mapping(resource_request)['memory_l']}
)
(get_k8s_resources_mapping(resource_request)['xxxx'] 只是 returns 一个取决于 resource_request 的值,例如 '2Gi' 代表内存或 ' 2' cpu)
但它们似乎不起作用。任务失败。
所以,我的问题是,您将如何在 Python 中正确构建 V1ResourceRequirements? 而且,它应该如何看待任务实例的 executor_config 属性?大概是这样的吧?
'resources': {'limits': {'cpu': '1', 'memory': '512Mi'}, 'requests': {'cpu': '1', 'memory': '512Mi'}}
正确的语法例如:
from kubernetes import client
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
KubernetesPodOperator(
...,
resources = client.V1ResourceRequirements(
requests={"cpu": "1000m", "memory": "8G"},
limits={"cpu": "16000m", "memory": "128G"}
)
)
如果您想动态生成它,只需将 requests/limits 中的值替换为 returns 和 expected string.
的函数即可