Airflow DockerOperator 挂载导致基于 docker-compose 的 Airflow 设置出错

Airflow DockerOperator mounts cause an error in docker-compose based Airflow setup

我正在 运行 使用 docker-compose 和 celery 执行器来安装 Airflow 2.1.4。到目前为止,我已经能够从 celery worker 容器启动和 运行 简单的 DockerOperator 任务,但是现在当我尝试将目录从共享驱动器挂载到任务容器时,出现错误(下面的日志文件)。如果我不定义 mounts 参数,Dag 工作正常。所以我猜有些信息或特权没有传递给 celery worker 容器创建的容器。有什么建议接下来要尝试什么吗?

DAG 文件:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.operators.docker_operator import DockerOperator
default_args = {
'owner'                 : 'airflow',
'description'           : 'Use of the DockerOperator',
'depend_on_past'        : False,
'start_date'            : datetime(2018, 1, 3),
'email_on_failure'      : False,
'email_on_retry'        : False,
'retries'               : 1,
'retry_delay'           : timedelta(minutes=5)
}
with DAG('docker_dag', default_args=default_args, schedule_interval="5 * * * *", catchup=False) as dag:
    t1 = BashOperator(
        task_id='print_current_date',
        bash_command='date'
    )
    t2 = DockerOperator(
        task_id='docker_command',
        image='centos:latest',
        api_version='auto',
        auto_remove=True,
        environment={
            'AF_EXECUTION_DATE': "{{ ds }}",
            'AF_OWNER': "{{ task.owner }}"
        },
        #command="/bin/date > /airflow/dev/tmp/date.txt",
        command="/bin/sleep 5",
        docker_url='unix://var/run/docker.sock',
        mounts=["/airflow/dev/tmp:/airflow/dev/tmp"],
        network_mode='bridge'
    )
    t3 = BashOperator(
        task_id='print_hello',
        bash_command='echo "hello world"'
    )
    t1 >> t2 >> t3

日志文件:

*** Reading local file: /opt/airflow/logs/docker_dag/docker_command/2021-11-03T13:51:12.410942+00:00/1.log
[2021-11-03 13:51:14,663] {taskinstance.py:903} INFO - Dependencies all met for <TaskInstance: docker_dag.docker_command 2021-11-03T13:51:12.410942+00:00 [queued]>
[2021-11-03 13:51:14,676] {taskinstance.py:903} INFO - Dependencies all met for <TaskInstance: docker_dag.docker_command 2021-11-03T13:51:12.410942+00:00 [queued]>
[2021-11-03 13:51:14,677] {taskinstance.py:1095} INFO - 
--------------------------------------------------------------------------------
[2021-11-03 13:51:14,677] {taskinstance.py:1096} INFO - Starting attempt 1 of 2
[2021-11-03 13:51:14,677] {taskinstance.py:1097} INFO - 
--------------------------------------------------------------------------------
[2021-11-03 13:51:14,682] {taskinstance.py:1115} INFO - Executing <Task(DockerOperator): docker_command> on 2021-11-03T13:51:12.410942+00:00
[2021-11-03 13:51:14,686] {standard_task_runner.py:52} INFO - Started process 1174765 to run task
[2021-11-03 13:51:14,688] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'docker_dag', 'docker_command', '2021-11-03T13:51:12.410942+00:00', '--job-id', '8647', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/docker-dag.py', '--cfg-path', '/tmp/tmpbgq__vi1', '--error-file', '/tmp/tmp7yyvqapv']
[2021-11-03 13:51:14,689] {standard_task_runner.py:77} INFO - Job 8647: Subtask docker_command
[2021-11-03 13:51:14,721] {logging_mixin.py:109} INFO - Running <TaskInstance: docker_dag.docker_command 2021-11-03T13:51:12.410942+00:00 [running]> on host my-made-up-host.com
[2021-11-03 13:51:14,760] {taskinstance.py:1254} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=docker_dag
AIRFLOW_CTX_TASK_ID=docker_command
AIRFLOW_CTX_EXECUTION_DATE=2021-11-03T13:51:12.410942+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-11-03T13:51:12.410942+00:00
[2021-11-03 13:51:14,779] {docker.py:246} INFO - Starting docker container from image centos:latest
[2021-11-03 13:51:14,782] {taskinstance.py:1463} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/client.py", line 268, in _raise_for_status
    response.raise_for_status()
  File "/home/airflow/.local/lib/python3.6/site-packages/requests/models.py", line 953, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 500 Server Error: Internal Server Error for url: http+docker://localhost/v1.41/containers/create

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1165, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1283, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1313, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 343, in execute
    return self._run_image()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 253, in _run_image
    return self._run_image_with_mounts(self.mounts + [tmp_mount], add_tmp_variable=True)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 293, in _run_image_with_mounts
    tty=self.tty,
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/container.py", line 428, in create_container
    return self.create_container_from_config(config, name)
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/container.py", line 439, in create_container_from_config
    return self._result(res, True)
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/client.py", line 274, in _result
    self._raise_for_status(response)
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/client.py", line 270, in _raise_for_status
    raise create_api_error_from_http_exception(e)
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/errors.py", line 31, in create_api_error_from_http_exception
    raise cls(e, response=response, explanation=explanation)
docker.errors.APIError: 500 Server Error for http+docker://localhost/v1.41/containers/create: Internal Server Error ("json: cannot unmarshal string into Go struct field HostConfig.HostConfig.Mounts of type mount.Mount")
[2021-11-03 13:51:14,784] {taskinstance.py:1513} INFO - Marking task as UP_FOR_RETRY. dag_id=docker_dag, task_id=docker_command, execution_date=20211103T135112, start_date=20211103T135114, end_date=20211103T135114
[2021-11-03 13:51:14,823] {local_task_job.py:151} INFO - Task exited with return code 1
[2021-11-03 13:51:14,848] {local_task_job.py:261} INFO - 0 downstream tasks scheduled from follow-on schedule check

我还没有尝试过,但这有帮助吗? 您当前正在将字符串列表传递给 mounts,但根据文档,您应该传递 Mount 实例列表。

mounts (list[docker.types.Mount]) -- List of volumes to mount into the container. Each item should be a docker.types.Mount instance.

参考:https://airflow.apache.org/docs/apache-airflow-providers-docker/stable/_api/airflow/providers/docker/operators/docker/index.html

DockerOperator 中的挂载示例。 https://github.com/apache/airflow/blob/main/airflow/providers/docker/example_dags/example_docker_copy_data.py#L70