Airflow DockerOperator 卷和挂载

Airflow DockerOperator volumes and mounts

我们有 Airflow 运行(使用 Docker 组合),其中有几个 DAG 处于活动状态。上周我们将 Airflow 更新到了 2.1.3 版。 这导致我们使用 DockerOperator:

的 DAG 出错
airflow.exceptions.AirflowException: Invalid arguments were passed to DockerOperator (task_id: t_docker). Invalid arguments were:
**kwargs: {'volumes':

我发现这个 release note 告诉我

The volumes parameter in airflow.providers.docker.operators.docker.DockerOperator and airflow.providers.docker.operators.docker_swarm.DockerSwarmOperator was replaced by the mounts parameter

所以我将我们的 DAG 从

更改为
t_docker = DockerOperator(
        task_id='t_docker',
        image='customimage:latest',
        container_name='custom_1',
        api_version='auto',
        auto_remove=True,
        volumes=['/home/airflow/scripts:/opt/airflow/scripts','/home/airflow/data:/opt/airflow/data'],
        docker_url='unix://var/run/docker.sock',
        network_mode='bridge',
        dag=dag
    )

至此

t_docker = DockerOperator(
        task_id='t_docker',
        image='customimage:latest',
        container_name='custom_1',
        api_version='auto',
        auto_remove=True,
        mounts=['/home/airflow/scripts:/opt/airflow/scripts','/home/airflow/data:/opt/airflow/data'],
        docker_url='unix://var/run/docker.sock',
        network_mode='bridge',
        dag=dag
    )

但现在我得到这个错误:

docker.errors.APIError: 500 Server Error for http+docker://localhost/v1.41/containers/create?name=custom_1: Internal Server Error ("json: cannot unmarshal string into Go struct field HostConfig.HostConfig.Mounts of type mount.Mount")

我做错了什么?

更改不仅在参数名称中,还更改为 Mount syntax

你应该更换

volumes=['/home/airflow/scripts:/opt/airflow/scripts','/home/airflow/data:/opt/airflow/data']

与:

mounts=[
    Mount(source="/home/airflow/scripts", target="/opt/airflow/scripts", type="bind"),
    Mount(source="/home/airflow/data", target="/opt/airflow/data", type="bind"),
]

因此您的代码将是:

from docker.types import Mount
t_docker = DockerOperator(
    task_id='t_docker',
    image='customimage:latest',
    container_name='custom_1',
    api_version='auto',
    auto_remove=True,
    mounts=[
        Mount(source="/home/airflow/scripts", target="/opt/airflow/scripts", type="bind"),
        Mount(source="/home/airflow/data", target="/opt/airflow/data", type="bind"),
    ],
    docker_url='unix://var/run/docker.sock',
    network_mode='bridge',
    dag=dag
)