在 airflow GKEStartPodOperator 操作符中配置卷

Configure volumes im airflow GKEStartPodOperator opertor

我有一个 google 云作曲家环境。在我的 DAG 中,我想在 GKE 中创建一个 pod。当我部署一个基于不需要任何卷配置或机密的 docker 容器的简单应用程序时,一切正常,例如:

kubernetes_max = GKEStartPodOperator(
    # The ID specified for the task.
    task_id="python-simple-app",
    # Name of task you want to run, used to generate Pod ID.
    name="python-demo-app",
    project_id=PROJECT_ID,
    location=CLUSTER_REGION,
    cluster_name=CLUSTER_NAME,
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["python", "app.py"],
    namespace="production",
    image="gcr.io/path/to/lab-python-job:latest",
)

但是当我有一个应用程序需要访问我的 GKE 集群卷时,我需要在我的 pod 中配置卷。问题是文档对此并不清楚。我曾经 foud 唯一的例子是:

volume = k8s.V1Volume(
    name='test-volume',
    persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name='test-volume'),
)

虽然我的清单文件中的卷(我用它从本地部署我的应用程序)看起来像这样:

volumes:
  - name: volume-prod
    secret:
      secretName: volume-prod
      items:
        - key: config
          path: config.json
        - key: another_config
          path: another_config.conf
        - key: random-ca
          path: random-ca.pem

因此,当我比较两个卷在控制台中的样子时(当我手动部署成功的清单文件时 运行,以及当我使用失败的 clod composer 部署 pod 时):

我如何从 cloud composer 配置我的 pod,使其可以读取我的集群的卷?

KubernetesPodOperator/GKEStartOperator 只是 python Kubernetes sdk 的包装器——我同意 Airflow/Cloud Composer 文档中没有很好地记录它,但是Python Kubernetes 的 SDK 本身有很好的文档记录。

从这里开始使用 kubernetes python sdk 文档:https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1PodSpec.md

您会注意到 KubernetesPodOperator/GKEStartOperator 采用的参数符合此规范。如果您深入研究 Operator 的源代码,您会发现 Operator 只不过是一个创建 kubernetes.client.models.V1Pod 对象并使用 API 部署 pod 的构建器。

operator takes a volumes parameter which should be of type List[V1Volume], where the documentation for V1Volume is here.

因此,在您的情况下,您需要提供:

from kubernetes.client import models as k8s

kubernetes_max = GKEStartPodOperator(
    # The ID specified for the task.
    task_id="python-simple-app",
    # Name of task you want to run, used to generate Pod ID.
    name="python-demo-app",
    project_id=PROJECT_ID,
    location=CLUSTER_REGION,
    cluster_name=CLUSTER_NAME,
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["python", "app.py"],
    namespace="production",
    image="gcr.io/path/to/lab-python-job:latest",
    volumes=[
        k8s.V1Volume(
            secret=k8s.V1SecretVolumeSource(
                secret_name="volume-prod",
                items=[
                    k8s.V1KeyToPath(key="config", path="config.json"),
                    k8s.V1KeyToPath(key="another_config", path="another_config.conf"),
                    k8s.V1KeyToPath(key="random-ca", path="random-ca.pem"),
                ],
            )
        )
    ]
)

或者,您可以将您的清单提供给 GKEStartPodOperator 中的 pod_template_file 参数 - 这将需要供 airflow 内部的工作人员使用。

有 3 种方法可以使用此运算符在 Airflow 中创建 pods:

  1. 使用操作员的参数指定您需要的内容,并让操作员为您构建 V1Pod
  2. 通过传入 pod_template_file 参数提供清单。
  3. 使用 Kubernetes sdk 自己创建一个 V1Pod 对象并将其传递给 full_pod_spec 参数。