气流意外参数 'mounts'

Airflow unexpected argument 'mounts'

我正在尝试设置从 .bag 文件中提取图像的 Airflow ETL 管道。我想将它提取到 docker 中,我正在使用 DockerOperator。 Docker 图像是从私有 GitLab 存储库中提取的。我想要 运行 的脚本是 Docker 容器中的 python 脚本。 .bag 文件在我的外部 SSD 上,所以我试图将它安装在 docker 中。代码有问题还是其他类型的问题?

错误:

[2021-09-16 10:39:17,010] {docker.py:246} INFO - Starting docker container from image registry.gitlab.com/url/of/gitlab:a24a3f05
[2021-09-16 10:39:17,010] {taskinstance.py:1462} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/filip/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1164, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/filip/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1282, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/filip/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1312, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/filip/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 343, in execute
    return self._run_image()
  File "/home/filip/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 265, in _run_image
    return self._run_image_with_mounts(self.mounts, add_tmp_variable=False)
  File "/home/filip/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 287, in _run_image_with_mounts
    privileged=self.privileged,
  File "/usr/lib/python3/dist-packages/docker/api/container.py", line 607, in create_host_config
    return HostConfig(*args, **kwargs)
TypeError: __init__() got an unexpected keyword argument 'mounts'
[2021-09-16 10:39:17,014] {taskinstance.py:1512} INFO - Marking task as FAILED. dag_id=ETL-test, task_id=docker_extract, execution_date=20210916T083912, start_date=20210916T083915, end_date=20210916T083917
[2021-09-16 10:39:17,062] {local_task_job.py:151} INFO - Task exited with return code 1
[2021-09-16 10:39:17,085] {local_task_job.py:261} INFO - 0 downstream tasks scheduled from follow-on schedule check

这是我的代码:

from airflow import DAG
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
from airflow.operators.dummy import DummyOperator
from airflow.providers.docker.operators.docker import DockerOperator
from docker.types import Mount
from airflow.operators.bash_operator import BashOperator

ssd_dir=Mount(source='/media/filip/external-ssd', target='/external-ssd', type='bind')

dag = DAG(
    'ETL-test',
    default_args = {
    'owner'                 : 'admin',
    'description'           : 'Extract data from bag, simple test',
    'depend_on_past'        : False,
    'start_date'            : datetime(2021, 9, 13),
    },
)

start_dag = DummyOperator(
task_id='start_dag',
dag=dag
)

extract = DockerOperator(
api_version="auto",
task_id='docker_extract',
image='registry.gitlab.com/url/of/gitlab:a24a3f05',
container_name='extract-test',
mounts=[ssd_dir],
auto_remove = True,
force_pull = False,
mount_tmp_dir=False,
command='python3 rgb_image_extraction.py --bagfile /external-ssd/2021-09-01-13-17-10.bag --output_dir /external-ssd/airflow --camera_topic /kirby1/vm0/stereo/left/color/image_rect --every_n_img 20 --timestamp_as_name',
docker_conn_id='gitlab_registry',
dag=dag
)

test = BashOperator(
task_id='print_hello',
bash_command='echo "hello world"',
dag=dag
)

start_dag >> extract >> test 

我认为您安装了旧的 docker python 库。如果你想确保 airflow 2.1.0 正常工作,你应该始终使用 https://airflow.apache.org/docs/apache-airflow/stable/installation.html 中描述的约束机制,否则你可能会有过时的依赖关系。

例如,如果你使用 Python 3.6,正确的约束是 https://raw.githubusercontent.com/apache/airflow/constraints-2.1.3/constraints-3.6.txt 并且 docker python 库是 5.0.0 我打赌你有更旧的版本.