Paramiko Sessions 在 Child 进程中关闭传输
Paramiko Sessions Closes Transport in the Child Process
我们正在使用 paramiko 创建一个连接库,该库大量使用其 get_pty
或 invoke_shell
功能。我们的库使用这些渠道与目标设备进行交互。
但是每当我们使用 multiprocessing
库时,我们都无法在 child 进程中使用 paramiko 连接句柄。 transport
在 child 进程中关闭。
Is there a way to tell paramiko not to close the connection/channel at fork.
这是重现问题的示例程序
from paramiko import SSHClient, AutoAddPolicy
from multiprocessing import Process
import logging
log = logging.getLogger("paramiko.transport").setLevel(1)
client = SSHClient()
client.set_missing_host_key_policy(AutoAddPolicy())
client.connect(hostname="localhost")
def simple_work(handle):
print("==== ENTERED CHILD PROCESS =====")
stdin, stdout, stderr = handle.exec_command("ifconfig")
print(stdout.read())
print("==== EXITING CHILD PROCESS =====")
p = Process(target=simple_work, args=(client,))
p.start()
p.join(2)
print("==== MAIN PROCESS AFTER JOIN =====")
stdin, stdout, stderr = client.exec_command("ls")
print(stdout.read())
这就是错误
==== ENTERED CHILD PROCESS =====
Success for unrequested channel! [??]
==== MAIN PROCESS AFTER JOIN =====
Traceback (most recent call last):
File "repro.py", line 22, in <module>
stdin, stdout, stderr = client.exec_command("ls")
File "/Users/vivejha/Projects/cisco/lib/python3.4/site-packages/paramiko/client.py", line 401, in exec_command
chan = self._transport.open_session(timeout=timeout)
File "/Users/vivejha/Projects/cisco/lib/python3.4/site-packages/paramiko/transport.py", line 702, in open_session
timeout=timeout)
File "/Users/vivejha/Projects/cisco/lib/python3.4/site-packages/paramiko/transport.py", line 823, in open_channel
raise e
paramiko.ssh_exception.SSHException: Unable to open channel.
需要注意的重要事项
如果我尝试访问 child 进程中的 client
。首先它根本不起作用。
其次,主进程中的句柄也莫名其妙的消亡了。我不知道这种 child-to-parent 交流是如何促进的以及为什么。
而且最大的问题是程序最后挂了,异常是可以的,但是挂起是最不期望的。
如果我不在child过程中使用client
,并且做一些其他工作然后在[=63=中使用client
] 进程不受影响,照常工作。
注意:transport.py 中有一个叫做 atfork
的东西声称可以控制这种行为。但令人惊讶的是,即使在该方法中注释代码也没有任何影响。在 paramiko 的整个代码库中也没有对 atfork
的引用。
PS:我正在使用最新的 paramiko,这个程序是 运行 在 Mac
当套接字涉及fork
时,这只是一个基本问题。两个进程共享同一个套接字,但只有一个可以使用它。试想一下,两个不同的进程正在管理一个套接字。他们都处于不同的状态,例如一个可能会向远程端发送和接收数据,而另一个则处于完全不同的加密状态。想想 nonces/initialization 向量,当两个进程发生分歧时,它们在分叉后将无效。
您的问题的解决方案显然是从 MultiProcessing
切换到 MultiThreading
。这样一来,您只有一个在所有线程之间共享的 ssh 连接。如果您真的想使用 fork,则必须为每个 fork 创建一个新连接。
def atfork(self):
"""
Terminate this Transport without closing the session. On posix
systems, if a Transport is open during process forking, both parent
and child will share the underlying socket, but only one process can
use the connection (without corrupting the session). Use this method
to clean up a Transport object without disrupting the other process.
在 paramiko 日志中,您会看到您的父进程从远程端收到一个 SSH_DISCONNECT_MSG,错误为:Packet corrupt
。很可能是由于父级处于不同的加密状态并发送了服务器无法理解的数据包。
DEBUG:lala:==== ENTERED CHILD PROCESS =====
DEBUG:lala:<paramiko.SSHClient object at 0xb74bf1ac>
DEBUG:lala:<paramiko.Transport at 0xb6fed82cL (cipher aes128-ctr, 128 bits) (active; 0 open channel(s))>
DEBUG:paramiko.transport:[chan 1] Max packet in: 34816 bytes
WARNING:paramiko.transport:Success for unrequested channel! [??]
DEBUG:lala:==== MAIN PROCESS AFTER JOIN =====
WARNING:lala:<socket._socketobject object at 0xb706ef7c>
DEBUG:paramiko.transport:[chan 1] Max packet in: 34816 bytes
INFO:paramiko.transport:Disconnect (code 2): Packet corrupt
这是一个使用 concurrent.futures 的基本多线程示例:
from concurrent.futures import ThreadPoolExecutor
def simple_work(handle):
print("==== ENTERED CHILD PROCESS =====")
stdin, stdout, stderr = handle.exec_command("whoami")
print(stdout.read())
print("==== EXITING CHILD PROCESS =====")
with ThreadPoolExecutor(max_workers=2) as executor:
future = executor.submit(simple_work, client)
print(future.result())
print("==== MAIN PROCESS AFTER JOIN =====")
stdin, stdout, stderr = client.exec_command("echo AFTER && whoami")
print(stdout.read())
另请注意,在大多数情况下,您甚至不需要引入额外的线程。 Paramiko exec_command
已经生成了一个新线程,并且在您尝试从任何伪文件 stdout
、stderr
中读取之前不会阻塞。这意味着,您也可以只执行一些命令并稍后从 stdout 读取。但请记住,由于缓冲区 运行 已满,paramiko 可能会停止。
我们正在使用 paramiko 创建一个连接库,该库大量使用其 get_pty
或 invoke_shell
功能。我们的库使用这些渠道与目标设备进行交互。
但是每当我们使用 multiprocessing
库时,我们都无法在 child 进程中使用 paramiko 连接句柄。 transport
在 child 进程中关闭。
Is there a way to tell paramiko not to close the connection/channel at fork.
这是重现问题的示例程序
from paramiko import SSHClient, AutoAddPolicy
from multiprocessing import Process
import logging
log = logging.getLogger("paramiko.transport").setLevel(1)
client = SSHClient()
client.set_missing_host_key_policy(AutoAddPolicy())
client.connect(hostname="localhost")
def simple_work(handle):
print("==== ENTERED CHILD PROCESS =====")
stdin, stdout, stderr = handle.exec_command("ifconfig")
print(stdout.read())
print("==== EXITING CHILD PROCESS =====")
p = Process(target=simple_work, args=(client,))
p.start()
p.join(2)
print("==== MAIN PROCESS AFTER JOIN =====")
stdin, stdout, stderr = client.exec_command("ls")
print(stdout.read())
这就是错误
==== ENTERED CHILD PROCESS =====
Success for unrequested channel! [??]
==== MAIN PROCESS AFTER JOIN =====
Traceback (most recent call last):
File "repro.py", line 22, in <module>
stdin, stdout, stderr = client.exec_command("ls")
File "/Users/vivejha/Projects/cisco/lib/python3.4/site-packages/paramiko/client.py", line 401, in exec_command
chan = self._transport.open_session(timeout=timeout)
File "/Users/vivejha/Projects/cisco/lib/python3.4/site-packages/paramiko/transport.py", line 702, in open_session
timeout=timeout)
File "/Users/vivejha/Projects/cisco/lib/python3.4/site-packages/paramiko/transport.py", line 823, in open_channel
raise e
paramiko.ssh_exception.SSHException: Unable to open channel.
需要注意的重要事项
如果我尝试访问 child 进程中的
client
。首先它根本不起作用。其次,主进程中的句柄也莫名其妙的消亡了。我不知道这种 child-to-parent 交流是如何促进的以及为什么。
而且最大的问题是程序最后挂了,异常是可以的,但是挂起是最不期望的。
如果我不在child过程中使用
client
,并且做一些其他工作然后在[=63=中使用client
] 进程不受影响,照常工作。
注意:transport.py 中有一个叫做 atfork
的东西声称可以控制这种行为。但令人惊讶的是,即使在该方法中注释代码也没有任何影响。在 paramiko 的整个代码库中也没有对 atfork
的引用。
PS:我正在使用最新的 paramiko,这个程序是 运行 在 Mac
当套接字涉及fork
时,这只是一个基本问题。两个进程共享同一个套接字,但只有一个可以使用它。试想一下,两个不同的进程正在管理一个套接字。他们都处于不同的状态,例如一个可能会向远程端发送和接收数据,而另一个则处于完全不同的加密状态。想想 nonces/initialization 向量,当两个进程发生分歧时,它们在分叉后将无效。
您的问题的解决方案显然是从 MultiProcessing
切换到 MultiThreading
。这样一来,您只有一个在所有线程之间共享的 ssh 连接。如果您真的想使用 fork,则必须为每个 fork 创建一个新连接。
def atfork(self):
"""
Terminate this Transport without closing the session. On posix
systems, if a Transport is open during process forking, both parent
and child will share the underlying socket, but only one process can
use the connection (without corrupting the session). Use this method
to clean up a Transport object without disrupting the other process.
在 paramiko 日志中,您会看到您的父进程从远程端收到一个 SSH_DISCONNECT_MSG,错误为:Packet corrupt
。很可能是由于父级处于不同的加密状态并发送了服务器无法理解的数据包。
DEBUG:lala:==== ENTERED CHILD PROCESS =====
DEBUG:lala:<paramiko.SSHClient object at 0xb74bf1ac>
DEBUG:lala:<paramiko.Transport at 0xb6fed82cL (cipher aes128-ctr, 128 bits) (active; 0 open channel(s))>
DEBUG:paramiko.transport:[chan 1] Max packet in: 34816 bytes
WARNING:paramiko.transport:Success for unrequested channel! [??]
DEBUG:lala:==== MAIN PROCESS AFTER JOIN =====
WARNING:lala:<socket._socketobject object at 0xb706ef7c>
DEBUG:paramiko.transport:[chan 1] Max packet in: 34816 bytes
INFO:paramiko.transport:Disconnect (code 2): Packet corrupt
这是一个使用 concurrent.futures 的基本多线程示例:
from concurrent.futures import ThreadPoolExecutor
def simple_work(handle):
print("==== ENTERED CHILD PROCESS =====")
stdin, stdout, stderr = handle.exec_command("whoami")
print(stdout.read())
print("==== EXITING CHILD PROCESS =====")
with ThreadPoolExecutor(max_workers=2) as executor:
future = executor.submit(simple_work, client)
print(future.result())
print("==== MAIN PROCESS AFTER JOIN =====")
stdin, stdout, stderr = client.exec_command("echo AFTER && whoami")
print(stdout.read())
另请注意,在大多数情况下,您甚至不需要引入额外的线程。 Paramiko exec_command
已经生成了一个新线程,并且在您尝试从任何伪文件 stdout
、stderr
中读取之前不会阻塞。这意味着,您也可以只执行一些命令并稍后从 stdout 读取。但请记住,由于缓冲区 运行 已满,paramiko 可能会停止。