Airflow DockerOperator 无法正确挂载 tmp 目录
Airflow DockerOperator unable to mount tmp directory correctly
我正在尝试 运行 在 docker 运行 命令中使用 Airflow 安排的简单 python 脚本。
我已按照此处的说明进行操作 Airflow init。
我的 .env
文件:
AIRFLOW_UID=1000
AIRFLOW_GID=0
而docker-compose.yaml
是基于默认的docker-compose.yaml。我必须将 - /var/run/docker.sock:/var/run/docker.sock
作为附加卷添加到 docker.
内的 运行 docker
我的dag配置如下:
""" this is an example dag """
from datetime import timedelta
from airflow import DAG
from airflow.operators.docker_operator import DockerOperator
from airflow.utils.dates import days_ago
from docker.types import Mount
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['info@foo.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 10,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'msg_europe_etl',
default_args=default_args,
description='Process MSG_EUROPE ETL',
schedule_interval=timedelta(minutes=15),
start_date=days_ago(0),
tags=['satellite_data'],
) as dag:
download_and_store = DockerOperator(
task_id='download_and_store',
image='satellite_image:latest',
auto_remove=True,
api_version='1.41',
mounts=[Mount(source='/home/archive_1/archive/satellite_data',
target='/app/data'),
Mount(source='/home/dlassahn/projects/forecast-system/meteoIntelligence-satellite',
target='/app')],
command="python3 src/scripts.py download_satellite_images "
"{{ (execution_date - macros.timedelta(hours=4)).strftime('%Y-%m-%d %H:%M') }} "
"'msg_europe' ",
)
download_and_store
Airflow 日志:
[2021-08-03 17:23:58,691] {docker.py:231} INFO - Starting docker container from image satellite_image:latest
[2021-08-03 17:23:58,702] {taskinstance.py:1501} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/client.py", line 268, in _raise_for_status
response.raise_for_status()
File "/home/airflow/.local/lib/python3.6/site-packages/requests/models.py", line 943, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: http+docker://localhost/v1.41/containers/create
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1157, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1331, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1361, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 319, in execute
return self._run_image()
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 258, in _run_image
tty=self.tty,
File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/container.py", line 430, in create_container
return self.create_container_from_config(config, name)
File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/container.py", line 441, in create_container_from_config
return self._result(res, True)
File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/client.py", line 274, in _result
self._raise_for_status(response)
File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/client.py", line 270, in _raise_for_status
raise create_api_error_from_http_exception(e)
File "/home/airflow/.local/lib/python3.6/site-packages/docker/errors.py", line 31, in create_api_error_from_http_exception
raise cls(e, response=response, explanation=explanation)
docker.errors.APIError: 400 Client Error for http+docker://localhost/v1.41/containers/create: Bad Request ("invalid mount config for type "bind": bind source path does not exist: /tmp/airflowtmp037k87u6")
由于未知的关键字参数 mount_tmp_dir
,正在尝试将 mount_tmp_dir=False
设置为 Dag ImportError。 (这可能是文档的问题)
不过我不知道如何正确配置tmp目录。
我的 Airflow 版本:2.1.2
Docker Provider 2.0.0 中存在一个错误,它阻止 Docker 操作员使用 Docker-In-Docker 解决方案 运行。
您需要升级到最新的Docker Provider 2.1.0
https://airflow.apache.org/docs/apache-airflow-providers-docker/stable/index.html#id1
您可以按照 https://airflow.apache.org/docs/docker-stack/build.html#extending-the-image 中的说明扩展图像,例如 - 这个 docker 文件:
FROM apache/airflow
RUN pip install --no-cache-dir apache-airflow-providers-docker==2.1.0
在这种情况下,操作员将使用“回退”模式(和警告消息)开箱即用,但您也可以禁用导致问题的挂载。来自 https://airflow.apache.org/docs/apache-airflow-providers-docker/stable/_api/airflow/providers/docker/operators/docker/index.html
的更多解释
By default, a temporary directory is created on the host and mounted
into a container to allow storing files that together exceed the
default disk size of 10GB in a container. In this case The path to the
mounted directory can be accessed via the environment variable
AIRFLOW_TMP_DIR.
If the volume cannot be mounted, warning is printed and an attempt is
made to execute the docker command without the temporary folder
mounted. This is to make it works by default with remote docker engine
or when you run docker-in-docker solution and temporary directory is
not shared with the docker engine. Warning is printed in logs in this
case.
If you know you run DockerOperator with remote engine or via
docker-in-docker you should set mount_tmp_dir parameter to False. In
this case, you can still use mounts parameter to mount already
existing named volumes in your Docker Engine to achieve similar
capability where you can store files exceeding default disk size of
the container,
我遇到了同样的问题,这里所有“推荐”的解决问题的方法和设置 mount_dir 参数的方法 here 都会导致其他错误。对我有帮助的一个解决方案是用 VPN 包装由 docker 代码调用(实际上这个 hack 取自另一个使用 VPN 并且运行良好的 docker-powered DAG)。
所以最终的解决方案如下:
#!/bin/bash
connect_to_vpn.sh &
sleep 10
python3 my_func.py
sleep 10
stop_vpn.sh
wait -n
exit $?
为了连接到 VPN,我使用了 openconnect。可以使用 apt install 安装并支持 anyconnect 协议(这是我的关键要求)。
我正在尝试 运行 在 docker 运行 命令中使用 Airflow 安排的简单 python 脚本。
我已按照此处的说明进行操作 Airflow init。
我的 .env
文件:
AIRFLOW_UID=1000
AIRFLOW_GID=0
而docker-compose.yaml
是基于默认的docker-compose.yaml。我必须将 - /var/run/docker.sock:/var/run/docker.sock
作为附加卷添加到 docker.
我的dag配置如下:
""" this is an example dag """
from datetime import timedelta
from airflow import DAG
from airflow.operators.docker_operator import DockerOperator
from airflow.utils.dates import days_ago
from docker.types import Mount
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['info@foo.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 10,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'msg_europe_etl',
default_args=default_args,
description='Process MSG_EUROPE ETL',
schedule_interval=timedelta(minutes=15),
start_date=days_ago(0),
tags=['satellite_data'],
) as dag:
download_and_store = DockerOperator(
task_id='download_and_store',
image='satellite_image:latest',
auto_remove=True,
api_version='1.41',
mounts=[Mount(source='/home/archive_1/archive/satellite_data',
target='/app/data'),
Mount(source='/home/dlassahn/projects/forecast-system/meteoIntelligence-satellite',
target='/app')],
command="python3 src/scripts.py download_satellite_images "
"{{ (execution_date - macros.timedelta(hours=4)).strftime('%Y-%m-%d %H:%M') }} "
"'msg_europe' ",
)
download_and_store
Airflow 日志:
[2021-08-03 17:23:58,691] {docker.py:231} INFO - Starting docker container from image satellite_image:latest
[2021-08-03 17:23:58,702] {taskinstance.py:1501} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/client.py", line 268, in _raise_for_status
response.raise_for_status()
File "/home/airflow/.local/lib/python3.6/site-packages/requests/models.py", line 943, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: http+docker://localhost/v1.41/containers/create
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1157, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1331, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1361, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 319, in execute
return self._run_image()
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 258, in _run_image
tty=self.tty,
File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/container.py", line 430, in create_container
return self.create_container_from_config(config, name)
File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/container.py", line 441, in create_container_from_config
return self._result(res, True)
File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/client.py", line 274, in _result
self._raise_for_status(response)
File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/client.py", line 270, in _raise_for_status
raise create_api_error_from_http_exception(e)
File "/home/airflow/.local/lib/python3.6/site-packages/docker/errors.py", line 31, in create_api_error_from_http_exception
raise cls(e, response=response, explanation=explanation)
docker.errors.APIError: 400 Client Error for http+docker://localhost/v1.41/containers/create: Bad Request ("invalid mount config for type "bind": bind source path does not exist: /tmp/airflowtmp037k87u6")
由于未知的关键字参数 mount_tmp_dir
,正在尝试将 mount_tmp_dir=False
设置为 Dag ImportError。 (这可能是文档的问题)
不过我不知道如何正确配置tmp目录。
我的 Airflow 版本:2.1.2
Docker Provider 2.0.0 中存在一个错误,它阻止 Docker 操作员使用 Docker-In-Docker 解决方案 运行。
您需要升级到最新的Docker Provider 2.1.0 https://airflow.apache.org/docs/apache-airflow-providers-docker/stable/index.html#id1
您可以按照 https://airflow.apache.org/docs/docker-stack/build.html#extending-the-image 中的说明扩展图像,例如 - 这个 docker 文件:
FROM apache/airflow
RUN pip install --no-cache-dir apache-airflow-providers-docker==2.1.0
在这种情况下,操作员将使用“回退”模式(和警告消息)开箱即用,但您也可以禁用导致问题的挂载。来自 https://airflow.apache.org/docs/apache-airflow-providers-docker/stable/_api/airflow/providers/docker/operators/docker/index.html
的更多解释By default, a temporary directory is created on the host and mounted into a container to allow storing files that together exceed the default disk size of 10GB in a container. In this case The path to the mounted directory can be accessed via the environment variable AIRFLOW_TMP_DIR.
If the volume cannot be mounted, warning is printed and an attempt is made to execute the docker command without the temporary folder mounted. This is to make it works by default with remote docker engine or when you run docker-in-docker solution and temporary directory is not shared with the docker engine. Warning is printed in logs in this case.
If you know you run DockerOperator with remote engine or via docker-in-docker you should set mount_tmp_dir parameter to False. In this case, you can still use mounts parameter to mount already existing named volumes in your Docker Engine to achieve similar capability where you can store files exceeding default disk size of the container,
我遇到了同样的问题,这里所有“推荐”的解决问题的方法和设置 mount_dir 参数的方法 here 都会导致其他错误。对我有帮助的一个解决方案是用 VPN 包装由 docker 代码调用(实际上这个 hack 取自另一个使用 VPN 并且运行良好的 docker-powered DAG)。
所以最终的解决方案如下:
#!/bin/bash
connect_to_vpn.sh &
sleep 10
python3 my_func.py
sleep 10
stop_vpn.sh
wait -n
exit $?
为了连接到 VPN,我使用了 openconnect。可以使用 apt install 安装并支持 anyconnect 协议(这是我的关键要求)。