将 PythonOperator 与已安装的 PersistentVolumeClaim 结合使用
Use PythonOperator with mounted PersistentVolumeClaim
我有一个带有 2 个运算符(一个 PythonOperator 和一个 KubernetesPodOperator)的简单 Airflow DAG:
with DAG(dag_id="dummy", start_date=datetime(2020, 11, 7), catchup=False) as dag:
logger = logging.getLogger("airflow.task")
volume_mount = k8s.v1_volume_mount.V1VolumeMount(name='osm-config',
mount_path=ROOT_PATH,
sub_path=None,
read_only=False)
pvc = k8s.V1PersistentVolumeClaimVolumeSource(claim_name="osm-config-pv-claim")
volume = k8s.v1_volume.V1Volume(name="osm-config",
persistent_volume_claim=pvc)
def do_it():
logger.debug("do work")
start = DummyOperator(task_id="start", dag=dag)
test = PythonOperator(task_id="test",
python_callable=do_it,
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[volume_mount]
)
],
volumes=[volume],
)
)
},
dag=dag)
download_data = KubernetesPodOperator(task_id="download_data",
namespace="default",
name="openmaptiles_download_data",
image="openmaptiles/openmaptiles-tools",
cmds=["download-osm"],
volumes=[volume],
volume_mounts=[volume_mount],
dag=dag)
start >> download_data >> test
目标是拥有 1 个由两个操作员使用的持久卷。 k8s operator 按预期获取挂载的值,并根据需要下载所有内容。但是,PythonOperator 永远处于 queued
状态。
拖尾调度程序 pod 显示以下错误:
Pod in version "v1" cannot be handled as a Pod: v1.Pod.Spec: v1.PodSpec.Containers: []v1.Container: v1.Container.VolumeMounts: []v1.VolumeMount: readObjectStart: expect { or n, but found ", error found in #10 byte of ...|-data"}
我怀疑这是由于 volume/volume 安装没有正确设置,因为格式看起来不对:
...
"volumeMounts": [
{
"mountPath": "/opt/airflow/dags",
"name": "dags-data"
},
{
"mountPath": "/opt/airflow/logs",
"name": "logs-data"
},
"{'mount_path': '/osm_config',\n 'mount_propagation': None,\n 'name': 'test',\n 'read_only': False,\n 'sub_path': None,\n 'sub_path_expr': None}"
]
但我的配置似乎与Airflow documentation
一致
问题是 Volume
传递给 PythonOperator 的类型。
我的原始示例使用了 k8s.v1_volume.V1Volume
和 k8s.v1_volume_mount.V1VolumeMount
,但改为使用 k8s.V1Volume
和 k8s.V1VolumeMount
创建了一个按预期安装卷的 pod。
我有一个带有 2 个运算符(一个 PythonOperator 和一个 KubernetesPodOperator)的简单 Airflow DAG:
with DAG(dag_id="dummy", start_date=datetime(2020, 11, 7), catchup=False) as dag:
logger = logging.getLogger("airflow.task")
volume_mount = k8s.v1_volume_mount.V1VolumeMount(name='osm-config',
mount_path=ROOT_PATH,
sub_path=None,
read_only=False)
pvc = k8s.V1PersistentVolumeClaimVolumeSource(claim_name="osm-config-pv-claim")
volume = k8s.v1_volume.V1Volume(name="osm-config",
persistent_volume_claim=pvc)
def do_it():
logger.debug("do work")
start = DummyOperator(task_id="start", dag=dag)
test = PythonOperator(task_id="test",
python_callable=do_it,
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[volume_mount]
)
],
volumes=[volume],
)
)
},
dag=dag)
download_data = KubernetesPodOperator(task_id="download_data",
namespace="default",
name="openmaptiles_download_data",
image="openmaptiles/openmaptiles-tools",
cmds=["download-osm"],
volumes=[volume],
volume_mounts=[volume_mount],
dag=dag)
start >> download_data >> test
目标是拥有 1 个由两个操作员使用的持久卷。 k8s operator 按预期获取挂载的值,并根据需要下载所有内容。但是,PythonOperator 永远处于 queued
状态。
拖尾调度程序 pod 显示以下错误:
Pod in version "v1" cannot be handled as a Pod: v1.Pod.Spec: v1.PodSpec.Containers: []v1.Container: v1.Container.VolumeMounts: []v1.VolumeMount: readObjectStart: expect { or n, but found ", error found in #10 byte of ...|-data"}
我怀疑这是由于 volume/volume 安装没有正确设置,因为格式看起来不对:
...
"volumeMounts": [
{
"mountPath": "/opt/airflow/dags",
"name": "dags-data"
},
{
"mountPath": "/opt/airflow/logs",
"name": "logs-data"
},
"{'mount_path': '/osm_config',\n 'mount_propagation': None,\n 'name': 'test',\n 'read_only': False,\n 'sub_path': None,\n 'sub_path_expr': None}"
]
但我的配置似乎与Airflow documentation
一致问题是 Volume
传递给 PythonOperator 的类型。
我的原始示例使用了 k8s.v1_volume.V1Volume
和 k8s.v1_volume_mount.V1VolumeMount
,但改为使用 k8s.V1Volume
和 k8s.V1VolumeMount
创建了一个按预期安装卷的 pod。