Airflow SSHOperator 的套接字异常:错误的文件描述符

Airflow SSHOperator's Socket exception: Bad file descriptor

在 Airflow 中,我使用 SSHOperator 调用一个 API 来处理一些自动化工作。工作 运行 成功并且确实生成了报告,但是 Airflow returns 由于套接字异常,任务失败。

有时会出现这个错误,想知道是什么原因造成的

收到的错误信息:

[2021-07-20 08:00:07,345] {ssh.py:109} INFO - Running command: curl -u <user:pw> <URL>
[2021-07-20 08:00:07,414] {ssh.py:145} WARNING -   % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
[2021-07-20 08:00:08,420] {ssh.py:145} WARNING - 
  0     0    0     0    0     0      0      0 --:--:--  0:00:01 --:--:--     0
[2021-07-20 08:00:09,421] {ssh.py:145} WARNING - 
  0     0    0     0    0     0      0      0 --:--:--  0:00:02 --:--:--     0
[2021-07-20 08:00:10,423] {ssh.py:145} WARNING - 
  0     0    0     0    0     0      0      0 --:--:--  0:00:03 --:--:--     0
[2021-07-20 08:00:10,615] {ssh.py:141} INFO - Report Sent Successfully.
[2021-07-20 08:00:10,616] {transport.py:1819} ERROR - Socket exception: Bad file descriptor (9)
[2021-07-20 08:00:10,633] {taskinstance.py:1481} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/u01/airflow-venv/lib/python3.8/site-packages/airflow/providers/ssh/operators/ssh.py", line 152, in execute
    stdout.channel.close()
  File "/u01/airflow-venv/lib/python3.8/site-packages/paramiko/channel.py", line 671, in close
    self.transport._send_user_message(m)
  File "/u01/airflow-venv/lib/python3.8/site-packages/paramiko/transport.py", line 1863, in _send_user_message
    self._send_message(data)
  File "/u01/airflow-venv/lib/python3.8/site-packages/paramiko/transport.py", line 1839, in _send_message
    self.packetizer.send_message(data)
  File "/u01/airflow-venv/lib/python3.8/site-packages/paramiko/packet.py", line 431, in send_message
    self.write_all(out)
  File "/u01/airflow-venv/lib/python3.8/site-packages/paramiko/packet.py", line 367, in write_all
    raise EOFError()
EOFError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/u01/airflow-venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1137, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/u01/airflow-venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/u01/airflow-venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1336, in _execute_task
    result = task_copy.execute(context=context)
  File "/u01/airflow-venv/lib/python3.8/site-packages/airflow/providers/ssh/operators/ssh.py", line 171, in execute
    raise AirflowException(f"SSH operator error: {str(e)}")
airflow.exceptions.AirflowException: SSH operator error: 

--- 编辑 ---

generate_report = SSHOperator(
    task_id = 'generate_report',
    ssh_conn_id = 'ssh_123',
    command = curl -u user:password "http://localhost:1234/path/to/trigger/report_creation_API?async=false",
)

这是 paramiko 库中的竞争条件。在此关闭上方的一行,我们在读取时调用关闭,在同一方法的最开始,我们在写入通道上调用关闭。这意味着在第二次关闭后,通道应该关闭,这可能是 paramiko 库中发生的事情。

然而,这似乎是在一个单独的线程中异步发生的。根据哪个线程先获得,当我们在运算符中调用 close() 时,套接字可能已经关闭。如果 paramiko 中的异步线程更快,我们将尝试关闭一个已经关闭的套接字并抛出错误。

这是典型的 race 情况。

由于关闭已关闭的连接基本上是空操作,我们可以安全地忽略此类异常。这是我刚刚在 PR 中所做的:

https://github.com/apache/airflow/pull/17528

它最有可能在下一波提供商中发布(很可能是在 8 月)。