Airflow SSHOperator 的套接字异常:错误的文件描述符
Airflow SSHOperator's Socket exception: Bad file descriptor
在 Airflow 中,我使用 SSHOperator 调用一个 API 来处理一些自动化工作。工作 运行 成功并且确实生成了报告,但是 Airflow returns 由于套接字异常,任务失败。
有时会出现这个错误,想知道是什么原因造成的
收到的错误信息:
[2021-07-20 08:00:07,345] {ssh.py:109} INFO - Running command: curl -u <user:pw> <URL>
[2021-07-20 08:00:07,414] {ssh.py:145} WARNING - % Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0
[2021-07-20 08:00:08,420] {ssh.py:145} WARNING -
0 0 0 0 0 0 0 0 --:--:-- 0:00:01 --:--:-- 0
[2021-07-20 08:00:09,421] {ssh.py:145} WARNING -
0 0 0 0 0 0 0 0 --:--:-- 0:00:02 --:--:-- 0
[2021-07-20 08:00:10,423] {ssh.py:145} WARNING -
0 0 0 0 0 0 0 0 --:--:-- 0:00:03 --:--:-- 0
[2021-07-20 08:00:10,615] {ssh.py:141} INFO - Report Sent Successfully.
[2021-07-20 08:00:10,616] {transport.py:1819} ERROR - Socket exception: Bad file descriptor (9)
[2021-07-20 08:00:10,633] {taskinstance.py:1481} ERROR - Task failed with exception
Traceback (most recent call last):
File "/u01/airflow-venv/lib/python3.8/site-packages/airflow/providers/ssh/operators/ssh.py", line 152, in execute
stdout.channel.close()
File "/u01/airflow-venv/lib/python3.8/site-packages/paramiko/channel.py", line 671, in close
self.transport._send_user_message(m)
File "/u01/airflow-venv/lib/python3.8/site-packages/paramiko/transport.py", line 1863, in _send_user_message
self._send_message(data)
File "/u01/airflow-venv/lib/python3.8/site-packages/paramiko/transport.py", line 1839, in _send_message
self.packetizer.send_message(data)
File "/u01/airflow-venv/lib/python3.8/site-packages/paramiko/packet.py", line 431, in send_message
self.write_all(out)
File "/u01/airflow-venv/lib/python3.8/site-packages/paramiko/packet.py", line 367, in write_all
raise EOFError()
EOFError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/u01/airflow-venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1137, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/u01/airflow-venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/u01/airflow-venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1336, in _execute_task
result = task_copy.execute(context=context)
File "/u01/airflow-venv/lib/python3.8/site-packages/airflow/providers/ssh/operators/ssh.py", line 171, in execute
raise AirflowException(f"SSH operator error: {str(e)}")
airflow.exceptions.AirflowException: SSH operator error:
--- 编辑 ---
generate_report = SSHOperator(
task_id = 'generate_report',
ssh_conn_id = 'ssh_123',
command = curl -u user:password "http://localhost:1234/path/to/trigger/report_creation_API?async=false",
)
这是 paramiko 库中的竞争条件。在此关闭上方的一行,我们在读取时调用关闭,在同一方法的最开始,我们在写入通道上调用关闭。这意味着在第二次关闭后,通道应该关闭,这可能是 paramiko 库中发生的事情。
然而,这似乎是在一个单独的线程中异步发生的。根据哪个线程先获得,当我们在运算符中调用 close()
时,套接字可能已经关闭。如果 paramiko 中的异步线程更快,我们将尝试关闭一个已经关闭的套接字并抛出错误。
这是典型的 race
情况。
由于关闭已关闭的连接基本上是空操作,我们可以安全地忽略此类异常。这是我刚刚在 PR 中所做的:
https://github.com/apache/airflow/pull/17528
它最有可能在下一波提供商中发布(很可能是在 8 月)。
在 Airflow 中,我使用 SSHOperator 调用一个 API 来处理一些自动化工作。工作 运行 成功并且确实生成了报告,但是 Airflow returns 由于套接字异常,任务失败。
有时会出现这个错误,想知道是什么原因造成的
收到的错误信息:
[2021-07-20 08:00:07,345] {ssh.py:109} INFO - Running command: curl -u <user:pw> <URL>
[2021-07-20 08:00:07,414] {ssh.py:145} WARNING - % Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0
[2021-07-20 08:00:08,420] {ssh.py:145} WARNING -
0 0 0 0 0 0 0 0 --:--:-- 0:00:01 --:--:-- 0
[2021-07-20 08:00:09,421] {ssh.py:145} WARNING -
0 0 0 0 0 0 0 0 --:--:-- 0:00:02 --:--:-- 0
[2021-07-20 08:00:10,423] {ssh.py:145} WARNING -
0 0 0 0 0 0 0 0 --:--:-- 0:00:03 --:--:-- 0
[2021-07-20 08:00:10,615] {ssh.py:141} INFO - Report Sent Successfully.
[2021-07-20 08:00:10,616] {transport.py:1819} ERROR - Socket exception: Bad file descriptor (9)
[2021-07-20 08:00:10,633] {taskinstance.py:1481} ERROR - Task failed with exception
Traceback (most recent call last):
File "/u01/airflow-venv/lib/python3.8/site-packages/airflow/providers/ssh/operators/ssh.py", line 152, in execute
stdout.channel.close()
File "/u01/airflow-venv/lib/python3.8/site-packages/paramiko/channel.py", line 671, in close
self.transport._send_user_message(m)
File "/u01/airflow-venv/lib/python3.8/site-packages/paramiko/transport.py", line 1863, in _send_user_message
self._send_message(data)
File "/u01/airflow-venv/lib/python3.8/site-packages/paramiko/transport.py", line 1839, in _send_message
self.packetizer.send_message(data)
File "/u01/airflow-venv/lib/python3.8/site-packages/paramiko/packet.py", line 431, in send_message
self.write_all(out)
File "/u01/airflow-venv/lib/python3.8/site-packages/paramiko/packet.py", line 367, in write_all
raise EOFError()
EOFError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/u01/airflow-venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1137, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/u01/airflow-venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/u01/airflow-venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1336, in _execute_task
result = task_copy.execute(context=context)
File "/u01/airflow-venv/lib/python3.8/site-packages/airflow/providers/ssh/operators/ssh.py", line 171, in execute
raise AirflowException(f"SSH operator error: {str(e)}")
airflow.exceptions.AirflowException: SSH operator error:
--- 编辑 ---
generate_report = SSHOperator(
task_id = 'generate_report',
ssh_conn_id = 'ssh_123',
command = curl -u user:password "http://localhost:1234/path/to/trigger/report_creation_API?async=false",
)
这是 paramiko 库中的竞争条件。在此关闭上方的一行,我们在读取时调用关闭,在同一方法的最开始,我们在写入通道上调用关闭。这意味着在第二次关闭后,通道应该关闭,这可能是 paramiko 库中发生的事情。
然而,这似乎是在一个单独的线程中异步发生的。根据哪个线程先获得,当我们在运算符中调用 close()
时,套接字可能已经关闭。如果 paramiko 中的异步线程更快,我们将尝试关闭一个已经关闭的套接字并抛出错误。
这是典型的 race
情况。
由于关闭已关闭的连接基本上是空操作,我们可以安全地忽略此类异常。这是我刚刚在 PR 中所做的:
https://github.com/apache/airflow/pull/17528
它最有可能在下一波提供商中发布(很可能是在 8 月)。