将自定义 volumeMount 添加到 Airflow worker pods(使用 k8s 执行器)
Add custom volumeMount into Airflow worker pods (with k8s Executor)
在 airflow.conf 的 kubernetes
部分下,有一个为 dag 和日志安装 volumeMount 的选项,例如
[kubernetes]
airflow_configmap = airflow_config
worker_container_repository = airflow
worker_container_tag = runner2
worker_container_image_pull_policy = IfNotPresent
delete_worker_pods = true
dags_volume_claim = airflow-dags-pvc
dags_volume_subpath = airflow/development/dags
logs_volume_claim = airflow-logs-pvc
logs_volume_subpath = airflow/development/logs
namespace = development
这按预期工作。意味着我可以看到 worker pods 成功地将这两个卷及其相关的 volumeMounts 安装在容器中。
"volumeMounts": [
{
"name": "airflow-dags",
"readOnly": true,
"mountPath": "/usr/local/airflow/dags",
"subPath": "airflow/development/dags"
},
{
"name": "airflow-logs",
"mountPath": "/usr/local/airflow/logs",
"subPath": "airflow/development/logs"
},
但是,我的工作人员 pods 依赖于从目录 airflow/development/plugins
和 airflow/development/libs
中获取自定义气流插件。因此,我需要使用来自 NFS 服务器的相关子路径将更多 volumeMount 添加到 worker pod 中。我怎样才能做到这一点?我尝试搜索任何相关的配置值,但找不到。
更新: 我将 executor_config
作为 executor_config={"KubernetesExecutor": {"image": "airflow:runner2"}}
传递到 dags 传感器任务之一。通过查看代码 airflow/contrib/kubernetes/worker_configuration.py
,似乎如果我将 volumes 和 volumeMounts 作为此配置的一部分传递,它应该可以工作。我会试试这个并在这里更新。
抱歉,我要回答我自己的问题。也许会帮助某人。在我定义任务的 dag 文件中,我只需按如下方式添加 executor_config;
IngestionStatusSensor(
task_id=...,
executor_config={"KubernetesExecutor": {
"image": "airflow:runner2",
"volume_mounts": [
{
"name": "airflow-dags",
"mountPath": "/usr/local/airflow/libs",
"subPath": "airflow/development/libs"
},
{
"name": "airflow-dags",
"mountPath": "/usr/local/airflow/plugins",
"subPath": "airflow/development/plugins"
}],
}
},
dag=dag,
ingestion_feed=table,
poke_interval=60 * 30,
offset=0,
start_date=start_date
)
其中 airflow-dags
卷已由 pod 创建者定义,该创建者声明在 airflow.conf 的 kubernetes
部分下的配置中定义的 PVC,例如dags_volume_claim = airflow-pvc
.
在 airflow.conf 的 kubernetes
部分下,有一个为 dag 和日志安装 volumeMount 的选项,例如
[kubernetes]
airflow_configmap = airflow_config
worker_container_repository = airflow
worker_container_tag = runner2
worker_container_image_pull_policy = IfNotPresent
delete_worker_pods = true
dags_volume_claim = airflow-dags-pvc
dags_volume_subpath = airflow/development/dags
logs_volume_claim = airflow-logs-pvc
logs_volume_subpath = airflow/development/logs
namespace = development
这按预期工作。意味着我可以看到 worker pods 成功地将这两个卷及其相关的 volumeMounts 安装在容器中。
"volumeMounts": [
{
"name": "airflow-dags",
"readOnly": true,
"mountPath": "/usr/local/airflow/dags",
"subPath": "airflow/development/dags"
},
{
"name": "airflow-logs",
"mountPath": "/usr/local/airflow/logs",
"subPath": "airflow/development/logs"
},
但是,我的工作人员 pods 依赖于从目录 airflow/development/plugins
和 airflow/development/libs
中获取自定义气流插件。因此,我需要使用来自 NFS 服务器的相关子路径将更多 volumeMount 添加到 worker pod 中。我怎样才能做到这一点?我尝试搜索任何相关的配置值,但找不到。
更新: 我将 executor_config
作为 executor_config={"KubernetesExecutor": {"image": "airflow:runner2"}}
传递到 dags 传感器任务之一。通过查看代码 airflow/contrib/kubernetes/worker_configuration.py
,似乎如果我将 volumes 和 volumeMounts 作为此配置的一部分传递,它应该可以工作。我会试试这个并在这里更新。
抱歉,我要回答我自己的问题。也许会帮助某人。在我定义任务的 dag 文件中,我只需按如下方式添加 executor_config;
IngestionStatusSensor(
task_id=...,
executor_config={"KubernetesExecutor": {
"image": "airflow:runner2",
"volume_mounts": [
{
"name": "airflow-dags",
"mountPath": "/usr/local/airflow/libs",
"subPath": "airflow/development/libs"
},
{
"name": "airflow-dags",
"mountPath": "/usr/local/airflow/plugins",
"subPath": "airflow/development/plugins"
}],
}
},
dag=dag,
ingestion_feed=table,
poke_interval=60 * 30,
offset=0,
start_date=start_date
)
其中 airflow-dags
卷已由 pod 创建者定义,该创建者声明在 airflow.conf 的 kubernetes
部分下的配置中定义的 PVC,例如dags_volume_claim = airflow-pvc
.