将 PythonOperator 与已安装的 PersistentVolumeClaim 结合使用

Use PythonOperator with mounted PersistentVolumeClaim

我有一个带有 2 个运算符(一个 PythonOperator 和一个 KubernetesPodOperator)的简单 Airflow DAG:

with DAG(dag_id="dummy", start_date=datetime(2020, 11, 7), catchup=False) as dag:
    logger = logging.getLogger("airflow.task")

    volume_mount = k8s.v1_volume_mount.V1VolumeMount(name='osm-config',
                                                     mount_path=ROOT_PATH,
                                                     sub_path=None,
                                                     read_only=False)

    pvc = k8s.V1PersistentVolumeClaimVolumeSource(claim_name="osm-config-pv-claim")

    volume = k8s.v1_volume.V1Volume(name="osm-config",
                                    persistent_volume_claim=pvc)

    def do_it():
        logger.debug("do work")


    start = DummyOperator(task_id="start", dag=dag)

    test = PythonOperator(task_id="test",
                          python_callable=do_it,
                          executor_config={
                              "pod_override": k8s.V1Pod(
                                  spec=k8s.V1PodSpec(
                                      containers=[
                                          k8s.V1Container(
                                              name="base",
                                              volume_mounts=[volume_mount]
                                          )
                                      ],
                                      volumes=[volume],
                                  )
                              )
                          },
                          dag=dag)

    download_data = KubernetesPodOperator(task_id="download_data",
                                          namespace="default",
                                          name="openmaptiles_download_data",
                                          image="openmaptiles/openmaptiles-tools",
                                          cmds=["download-osm"],
                                          volumes=[volume],
                                          volume_mounts=[volume_mount],
                                          dag=dag)


    start >> download_data >> test

目标是拥有 1 个由两个操作员使用的持久卷。 k8s operator 按预期获取挂载的值,并根据需要下载所有内容。但是,PythonOperator 永远处于 queued 状态。

拖尾调度程序 pod 显示以下错误:

Pod in version "v1" cannot be handled as a Pod: v1.Pod.Spec: v1.PodSpec.Containers: []v1.Container: v1.Container.VolumeMounts: []v1.VolumeMount: readObjectStart: expect { or n, but found ", error found in #10 byte of ...|-data"}

我怀疑这是由于 volume/volume 安装没有正确设置,因为格式看起来不对:

...

 "volumeMounts": [ 
   { 
     "mountPath": "/opt/airflow/dags", 
     "name": "dags-data" 
   }, 
   { 
     "mountPath": "/opt/airflow/logs", 
     "name": "logs-data" 
   }, 
   "{'mount_path': '/osm_config',\n 'mount_propagation': None,\n 'name': 'test',\n 'read_only': False,\n 'sub_path': None,\n 'sub_path_expr': None}" 
 ] 

但我的配置似乎与Airflow documentation

一致

问题是 Volume 传递给 PythonOperator 的类型。

我的原始示例使用了 k8s.v1_volume.V1Volumek8s.v1_volume_mount.V1VolumeMount,但改为使用 k8s.V1Volumek8s.V1VolumeMount 创建了一个按预期安装卷的 pod。