将自定义 volumeMount 添加到 Airflow worker pods(使用 k8s 执行器)

Add custom volumeMount into Airflow worker pods (with k8s Executor)

在 airflow.conf 的 kubernetes 部分下,有一个为 dag 和日志安装 volumeMount 的选项,例如

[kubernetes]
airflow_configmap = airflow_config
worker_container_repository = airflow
worker_container_tag = runner2
worker_container_image_pull_policy = IfNotPresent
delete_worker_pods = true
dags_volume_claim = airflow-dags-pvc
dags_volume_subpath = airflow/development/dags
logs_volume_claim = airflow-logs-pvc
logs_volume_subpath = airflow/development/logs
namespace = development

这按预期工作。意味着我可以看到 worker pods 成功地将这两个卷及其相关的 volumeMounts 安装在容器中。

   "volumeMounts": [
      {
        "name": "airflow-dags",
        "readOnly": true,
        "mountPath": "/usr/local/airflow/dags",
        "subPath": "airflow/development/dags"
      },
      {
        "name": "airflow-logs",
        "mountPath": "/usr/local/airflow/logs",
        "subPath": "airflow/development/logs"
      },

但是,我的工作人员 pods 依赖于从目录 airflow/development/pluginsairflow/development/libs 中获取自定义气流插件。因此,我需要使用来自 NFS 服务器的相关子路径将更多 volumeMount 添加到 worker pod 中。我怎样才能做到这一点?我尝试搜索任何相关的配置值,但找不到。

更新: 我将 executor_config 作为 executor_config={"KubernetesExecutor": {"image": "airflow:runner2"}} 传递到 dags 传感器任务之一。通过查看代码 airflow/contrib/kubernetes/worker_configuration.py ,似乎如果我将 volumes 和 volumeMounts 作为此配置的一部分传递,它应该可以工作。我会试试这个并在这里更新。

抱歉,我要回答我自己的问题。也许会帮助某人。在我定义任务的 dag 文件中,我只需按如下方式添加 executor_config;

        IngestionStatusSensor(
            task_id=...,
            executor_config={"KubernetesExecutor": {
                                  "image": "airflow:runner2",
                                  "volume_mounts": [
                                      {
                                          "name": "airflow-dags",
                                          "mountPath": "/usr/local/airflow/libs",
                                          "subPath": "airflow/development/libs"
                                      },
                                      {
                                          "name": "airflow-dags",
                                          "mountPath": "/usr/local/airflow/plugins",
                                          "subPath": "airflow/development/plugins"
                                      }],
                                  }
                             },
            dag=dag,
            ingestion_feed=table,
            poke_interval=60 * 30,
            offset=0,
            start_date=start_date
        )

其中 airflow-dags 卷已由 pod 创建者定义,该创建者声明在 airflow.conf 的 kubernetes 部分下的配置中定义的 PVC,例如dags_volume_claim = airflow-pvc.