无法在 Airflow DockerOperator 中执行 运行 脚本

Unable to run script within Airflow DockerOperator

我正在尝试 运行 在 docker 运行 命令中使用 Airflow 安排的简单 python 脚本。

我已按照此处的说明进行操作 Airflow init

我的 .env 文件:

AIRFLOW_UID=1000
AIRFLOW_GID=0

docker-compose.yaml是默认的docker-compose.yaml

我的dag配置如下:

""" this is an example dag """
from datetime import timedelta

from airflow import DAG

from airflow.operators.docker_operator import DockerOperator
from airflow.utils.dates import days_ago
from docker.types import Mount

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['info@foo.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 10,
    'retry_delay': timedelta(minutes=5),
}
with DAG(
    'msg_europe_etl',
    default_args=default_args,
    description='Process MSG_EUROPE ETL',
    schedule_interval=timedelta(minutes=15),
    start_date=days_ago(0),
    tags=['satellite_data'],
) as dag:

    download_and_store = DockerOperator(
        task_id='download_and_store',
        image='satellite_image:latest',
        auto_remove=True,
        api_version='1.41',
        network_mode="overlay",
        mounts=[Mount(source='/home/archive_1/archive/satellite_data',
                      target='/app/data'),
                Mount(source='/home/dlassahn/projects/forecast-system/meteoIntelligence-satellite',
                      target='/app')],
        command="python3 src/scripts.py download_satellite_images "
                     "{{ (execution_date - macros.timedelta(hours=4)).strftime('%Y-%m-%d %H:%M') }} "
                     "'msg_europe' ",
    )

    download_and_store

但我猜 Airflow 日志中的错误是由于缺少对套接字的访问:

[2021-08-03 16:09:19,968] {taskinstance.py:1501} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 677, in urlopen
    chunked=chunked,
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 392, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/usr/local/lib/python3.6/http/client.py", line 1287, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/usr/local/lib/python3.6/http/client.py", line 1333, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/usr/local/lib/python3.6/http/client.py", line 1282, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/usr/local/lib/python3.6/http/client.py", line 1042, in _send_output
    self.send(msg)
  File "/usr/local/lib/python3.6/http/client.py", line 980, in send
    self.connect()
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/transport/unixconn.py", line 43, in connect
    sock.connect(self.unix_socket)
FileNotFoundError: [Errno 2] No such file or directory

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/requests/adapters.py", line 449, in send
    timeout=timeout
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 727, in urlopen
    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/util/retry.py", line 410, in increment
    raise six.reraise(type(error), error, _stacktrace)
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/packages/six.py", line 734, in reraise
    raise value.with_traceback(tb)
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 677, in urlopen
    chunked=chunked,
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 392, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/usr/local/lib/python3.6/http/client.py", line 1287, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/usr/local/lib/python3.6/http/client.py", line 1333, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/usr/local/lib/python3.6/http/client.py", line 1282, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/usr/local/lib/python3.6/http/client.py", line 1042, in _send_output
    self.send(msg)
  File "/usr/local/lib/python3.6/http/client.py", line 980, in send
    self.connect()
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/transport/unixconn.py", line 43, in connect
    sock.connect(self.unix_socket)
urllib3.exceptions.ProtocolError: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1157, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1331, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1361, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 300, in execute
    if self.force_pull or not self.cli.images(name=self.image):
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/image.py", line 98, in images
    res = self._result(self._get(self._url("/images/json"), params=params),
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/utils/decorators.py", line 46, in inner
    return f(self, *args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/client.py", line 237, in _get
    return self.get(url, **self._set_request_timeout(kwargs))
  File "/home/airflow/.local/lib/python3.6/site-packages/requests/sessions.py", line 555, in get
    return self.request('GET', url, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/requests/sessions.py", line 542, in request
    resp = self.send(prep, **send_kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/requests/sessions.py", line 655, in send
    r = adapter.send(request, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/requests/adapters.py", line 498, in send
    raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))

我的 Airflow 版本:2.1.2

grep /etc/group -e "docker" 的输出:docker:x:998:username

您需要将 docker 套接字映射到 Docker 容器内部,以使 Docker 运算符工作。基本上你需要得到“docker-in-docker”设置。

在此处查看选项 1):https://devopscube.com/run-docker-in-docker/ - 这可能是您想要的(您只需要在 docker-compose 文件中添加适当的挂载)