如何从本身 运行 位于 docker 容器中的气流中 运行 docker 操作员任务?
How to run a docker operator task from airflow which itself runs in a docker container?
我的 windows 机器上有一个 docker 容器 运行,它是使用 docker-compose file provided in the official docs.
的改编版本构建的
这很好用,但是我想移动 python 脚本,它们是我的任务,从已安装的 plugins
文件夹中移动到它们自己的 docker 容器中。
为了对此进行测试,我创建了一个简单的“Hello World!”示例脚本:
import numpy as np
def main():
print(f'Hello World')
print(np.random.random((3, 3)))
if __name__ == '__main__':
main()
其中加上下面简单的docker-file
FROM python:3.9
ADD main.py .
RUN pip install numpy
CMD ["python", "./main.py"]
我可以用 docker build -t docker-test-image .
创建我的图像
运行 来自 CLI 的 docker run --name docker-test-container docker-test-image
将给我预期的输出:
Hello World
[[0.20923763 0.25415024 0.95603957]
[0.01320074 0.58392589 0.24175036]
[0.06431375 0.87276564 0.9912474 ]]
到目前为止一切顺利,但是如果我用我的 docker-operator 触发 DAG,它就会失败,我会得到几个
FileNotFoundError: [Errno 2] No such file or directory
errors in the logs.
我的 DAG 脚本如下所示:
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.docker.operators.docker import DockerOperator
from datetime import timedelta
with DAG(
dag_id= 'docker_test_dag',
description='Testing the docker operator',
schedule_interval=None,
start_date=days_ago(2),
catchup=False,
tags=['docker_test'],
default_args={
'owner': 'airflow',
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'depends_on_past': False,
'retry_delay': timedelta(minutes=5)
}
) as dag:
docker_test_task = DockerOperator(
task_id='docker_test_task',
image='docker-test-image',
api_version='auto',
auto_remove=True,
mount_tmp_dir=False,
container_name='docker-test-container',
command='echo "this is a test message shown from within the container',
docker_url='unix://var/run/docker.sock',
network_mode='bridge'
)
docker_test_task
仔细研究后,我认为这是一个 Docker-in-Docker 问题,最有可能的解决方案是 found in this tutorial。我将 - /var/run/docker.sock:/var/run/docker.sock
添加到我的气流 docker-compose 文件中的卷部分。不,DAG 仍然失败并出现以下日志错误:
Traceback (most recent call last): File
"/home/airflow/.local/lib/python3.7/site-packages/requests/adapters.py",
line 450, in send
timeout=timeout File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/connectionpool.py",
line 786, in urlopen
method, url, error=e, _pool=self, _stacktrace=sys.exc_info()2 File
"/home/airflow/.local/lib/python3.7/site-packages/urllib3/util/retry.py",
line 550, in increment
raise six.reraise(type(error), error, _stacktrace) File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/packages/six.py",
line 769, in reraise
raise value.with_traceback(tb) File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/connectionpool.py",
line 710, in urlopen
chunked=chunked, File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/connectionpool.py",
line 398, in _make_request
conn.request(method, url, **httplib_request_kw) File "/usr/local/lib/python3.7/http/client.py", line 1281, in request
self._send_request(method, url, body, headers, encode_chunked) File "/usr/local/lib/python3.7/http/client.py", line 1327, in
_send_request
self.endheaders(body, encode_chunked=encode_chunked) File "/usr/local/lib/python3.7/http/client.py", line 1276, in endheaders
self._send_output(message_body, encode_chunked=encode_chunked) File "/usr/local/lib/python3.7/http/client.py", line 1036, in
_send_output
self.send(msg) File "/usr/local/lib/python3.7/http/client.py", line 976, in send
self.connect() File "/home/airflow/.local/lib/python3.7/site-packages/docker/transport/unixconn.py",
line 30, in connect
sock.connect(self.unix_socket) urllib3.exceptions.ProtocolError: ('Connection aborted.', PermissionError(13, 'Permission denied'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File
"/home/airflow/.local/lib/python3.7/site-packages/docker/api/client.py",
line 214, in _retrieve_server_version
return self.version(api_version=False)["ApiVersion"] File "/home/airflow/.local/lib/python3.7/site-packages/docker/api/daemon.py",
line 181, in version
return self._result(self._get(url), json=True) File "/home/airflow/.local/lib/python3.7/site-packages/docker/utils/decorators.py",
line 46, in inner
return f(self, *args, **kwargs) File "/home/airflow/.local/lib/python3.7/site-packages/docker/api/client.py",
line 237, in _get
return self.get(url, **self._set_request_timeout(kwargs)) File "/home/airflow/.local/lib/python3.7/site-packages/requests/sessions.py",
line 542, in get
return self.request('GET', url, **kwargs) File "/home/airflow/.local/lib/python3.7/site-packages/requests/sessions.py",
line 529, in request
resp = self.send(prep, **send_kwargs) File "/home/airflow/.local/lib/python3.7/site-packages/requests/sessions.py",
line 645, in send
r = adapter.send(request, **kwargs) File "/home/airflow/.local/lib/python3.7/site-packages/requests/adapters.py",
line 501, in send
raise ConnectionError(err, request=request) requests.exceptions.ConnectionError: ('Connection aborted.',
PermissionError(13, 'Permission denied'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/docker/operators/docker.py",
line 360, in execute
self.cli = self._get_cli() File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/docker/operators/docker.py",
line 390, in _get_cli
return APIClient(base_url=self.docker_url, version=self.api_version, tls=tls_config) File
"/home/airflow/.local/lib/python3.7/site-packages/docker/api/client.py",
line 197, in init
self._version = self._retrieve_server_version() File "/home/airflow/.local/lib/python3.7/site-packages/docker/api/client.py",
line 222, in _retrieve_server_version
f'Error while fetching server API version: {e}' docker.errors.DockerException: Error while fetching server API
version: ('Connection aborted.', PermissionError(13, 'Permission
denied'))
这应该如何完成?
终于找到答案了:
在 docker-compose 文件中而不是
- /var/run/docker.sock:/var/run/docker.sock
在 windows 机器上使用:
- //var/run/docker.sock:/var/run/docker.sock
我想小的改变,大的效果..
我的 windows 机器上有一个 docker 容器 运行,它是使用 docker-compose file provided in the official docs.
的改编版本构建的这很好用,但是我想移动 python 脚本,它们是我的任务,从已安装的 plugins
文件夹中移动到它们自己的 docker 容器中。
为了对此进行测试,我创建了一个简单的“Hello World!”示例脚本:
import numpy as np
def main():
print(f'Hello World')
print(np.random.random((3, 3)))
if __name__ == '__main__':
main()
其中加上下面简单的docker-file
FROM python:3.9
ADD main.py .
RUN pip install numpy
CMD ["python", "./main.py"]
我可以用 docker build -t docker-test-image .
创建我的图像
运行 来自 CLI 的 docker run --name docker-test-container docker-test-image
将给我预期的输出:
Hello World
[[0.20923763 0.25415024 0.95603957]
[0.01320074 0.58392589 0.24175036]
[0.06431375 0.87276564 0.9912474 ]]
到目前为止一切顺利,但是如果我用我的 docker-operator 触发 DAG,它就会失败,我会得到几个
FileNotFoundError: [Errno 2] No such file or directory errors in the logs.
我的 DAG 脚本如下所示:
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.docker.operators.docker import DockerOperator
from datetime import timedelta
with DAG(
dag_id= 'docker_test_dag',
description='Testing the docker operator',
schedule_interval=None,
start_date=days_ago(2),
catchup=False,
tags=['docker_test'],
default_args={
'owner': 'airflow',
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'depends_on_past': False,
'retry_delay': timedelta(minutes=5)
}
) as dag:
docker_test_task = DockerOperator(
task_id='docker_test_task',
image='docker-test-image',
api_version='auto',
auto_remove=True,
mount_tmp_dir=False,
container_name='docker-test-container',
command='echo "this is a test message shown from within the container',
docker_url='unix://var/run/docker.sock',
network_mode='bridge'
)
docker_test_task
仔细研究后,我认为这是一个 Docker-in-Docker 问题,最有可能的解决方案是 found in this tutorial。我将 - /var/run/docker.sock:/var/run/docker.sock
添加到我的气流 docker-compose 文件中的卷部分。不,DAG 仍然失败并出现以下日志错误:
Traceback (most recent call last): File "/home/airflow/.local/lib/python3.7/site-packages/requests/adapters.py", line 450, in send timeout=timeout File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 786, in urlopen method, url, error=e, _pool=self, _stacktrace=sys.exc_info()2 File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/util/retry.py", line 550, in increment raise six.reraise(type(error), error, _stacktrace) File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/packages/six.py", line 769, in reraise raise value.with_traceback(tb) File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 710, in urlopen chunked=chunked, File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 398, in _make_request conn.request(method, url, **httplib_request_kw) File "/usr/local/lib/python3.7/http/client.py", line 1281, in request self._send_request(method, url, body, headers, encode_chunked) File "/usr/local/lib/python3.7/http/client.py", line 1327, in _send_request self.endheaders(body, encode_chunked=encode_chunked) File "/usr/local/lib/python3.7/http/client.py", line 1276, in endheaders self._send_output(message_body, encode_chunked=encode_chunked) File "/usr/local/lib/python3.7/http/client.py", line 1036, in _send_output self.send(msg) File "/usr/local/lib/python3.7/http/client.py", line 976, in send self.connect() File "/home/airflow/.local/lib/python3.7/site-packages/docker/transport/unixconn.py", line 30, in connect sock.connect(self.unix_socket) urllib3.exceptions.ProtocolError: ('Connection aborted.', PermissionError(13, 'Permission denied'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/home/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 214, in _retrieve_server_version return self.version(api_version=False)["ApiVersion"] File "/home/airflow/.local/lib/python3.7/site-packages/docker/api/daemon.py", line 181, in version return self._result(self._get(url), json=True) File "/home/airflow/.local/lib/python3.7/site-packages/docker/utils/decorators.py", line 46, in inner return f(self, *args, **kwargs) File "/home/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 237, in _get return self.get(url, **self._set_request_timeout(kwargs)) File "/home/airflow/.local/lib/python3.7/site-packages/requests/sessions.py", line 542, in get return self.request('GET', url, **kwargs) File "/home/airflow/.local/lib/python3.7/site-packages/requests/sessions.py", line 529, in request resp = self.send(prep, **send_kwargs) File "/home/airflow/.local/lib/python3.7/site-packages/requests/sessions.py", line 645, in send r = adapter.send(request, **kwargs) File "/home/airflow/.local/lib/python3.7/site-packages/requests/adapters.py", line 501, in send raise ConnectionError(err, request=request) requests.exceptions.ConnectionError: ('Connection aborted.', PermissionError(13, 'Permission denied'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/docker/operators/docker.py", line 360, in execute self.cli = self._get_cli() File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/docker/operators/docker.py", line 390, in _get_cli return APIClient(base_url=self.docker_url, version=self.api_version, tls=tls_config) File "/home/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 197, in init self._version = self._retrieve_server_version() File "/home/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 222, in _retrieve_server_version f'Error while fetching server API version: {e}' docker.errors.DockerException: Error while fetching server API version: ('Connection aborted.', PermissionError(13, 'Permission denied'))
这应该如何完成?
终于找到答案了:
在 docker-compose 文件中而不是
- /var/run/docker.sock:/var/run/docker.sock
在 windows 机器上使用:
- //var/run/docker.sock:/var/run/docker.sock
我想小的改变,大的效果..