运行 使用 Python Paramiko 在不同的 SSH 服务器中并行执行多个命令

Run multiple commands in different SSH servers in parallel using Python Paramiko

我有一个 SSH.py 的目标是通过 SSH 连接到许多服务器到 运行 一个 Python 脚本 (worker.py)。我正在使用 Paramiko,但我对它很陌生,并且在学习过程中不断学习。在我使用 ssh 的每台服务器上,我需要保留 Python 脚本 运行ning —— 这是为了并行训练模型,因此脚本需要在所有机器上 运行联合更新模型 parameters/train。服务器上的 Python 脚本需要 运行ning 所以要么所有的 SSH 连接都无法关闭,要么我必须想办法让服务器上的 Python 脚本保持 运行ning 即使我关闭连接。

通过广泛的谷歌搜索,您似乎可以通过 nohup 或:

实现此目的
client = paramiko.SSHClient()
client.connect(ip_address, username, password)
transport = client.get_transport()
channel = transport.open_session()
channel.exec_command("python worker.py > /logs/'command output' 2>&1")

但是,我不清楚的是我们如何 close/exit 所有 SSH 连接?我正在 运行 在 cmd.exe 上设置 SSH.py 文件,关闭 cmd.exe 是否足以让所有进程远程关闭?

此外,我对 client.close() 的使用是否符合我的目的? 请在下面查看我的代码。

# SSH.py

import paramiko
import argparse
import os

path = "path"
python_script = "worker.py"

# definitions for ssh connection and cluster
ip_list = ['XXX.XXX.XXX.XXX', XXX.XXX.XXX.XXX', XXX.XXX.XXX.XXX']
port_list = [':XXXX', ':XXXX', ':XXXX']
user_list = ['user', 'user', 'user']
password_list = ['pass', 'pass', 'pass']
node_list = list(map(lambda x: f'-node{x + 1} ', list(range(len(ip_list)))))
cluster = ' '.join([node + ip + port for node, ip, port in zip(node_list, ip_list, port_list)])

# run script on command line of local machine
os.system(f"cd {path} && python {python_script} {cluster} -type worker -index 0 -batch 64 > {path}/logs/'command output'/{ip_list[0]}.log 2>&1")

# loop for IP and password
for i, (ip, user, password) in enumerate(zip(ip_list[1:], user_list[1:], password_list[1:]), 1):
    try:
        print("Open session in: " + ip + "...")
        client = paramiko.SSHClient()
        client.connect(ip, user, password)
        transport = client.get_transport()
        channel = transport.open_session()
    except paramiko.SSHException:
        print("Connection Failed")
        quit()

    try:
        channel.exec_command(f"cd {path} && python {python_script} {cluster} -type worker -index {i} -batch 64 > {path}/logs/'command output'/{ip_list[i]}.log 2>&1", timeout=30)
        client.close() # here I am closing connection but above command should be running, my question is can I safely close cmd.exe on which I am running SSH.py? 
    except paramiko.SSHException:
        print("Cannot run file. Continue with other IPs in list...")
        client.close()
        continue

代码基于Running process of remote SSH server in the background using Python Paramiko

编辑:channel.exec_command() 似乎没有执行命令

f"cd {path} && python {python_script} {cluster} -type worker -index {i} -batch 64 > {path}/logs/'command output'/{ip_list[i]}.log 2>&1"

所以我想知道是不是因为client.close()?如果我用 client.close() 注释掉所有行会发生什么?这会有帮助吗?这很危险吗?当我退出本地 Python 脚本时,这是否会关闭我所有的 SSH 连接,因此不需要 client.close()?

而且我所有的机器都有 Windows OS.

确实,问题是您关闭了 SSH 连接。由于远程进程未与终端分离,关闭终端将终止进程。在 Linux 个服务器上,您可以使用 nohup。我不知道什么是(如果有的话)Windows 等价物。

反正好像不需要关闭连接。我明白了,您可以等待所有命令完成。

stdouts = []
clients = []

# Start the commands
commands = zip(ip_list[1:], user_list[1:], password_list[1:])
for i, (ip, user, password) in enumerate(commands, 1):
    print("Open session in: " + ip + "...")
    client = paramiko.SSHClient()
    client.connect(ip, user, password)
    command = \
        f"cd {path} && " + \
        f"python {python_script} {cluster} -type worker -index {i} -batch 64 " + \
        f"> {path}/logs/'command output'/{ip_list[i]}.log 2>&1"
    stdin, stdout, stderr = client.exec_command(command)
    clients.append(client)
    stdouts.append(stdout)

# Wait for commands to complete
for i in range(len(stdouts)):
    stdouts[i].read()
    clients[i].close()

请注意,上面使用 stdout.read() 的简单解决方案之所以有效,只是因为您将命令输出重定向到远程文件。如果你不是,.

如果没有它(或者如果您想在本地查看命令输出),您将需要这样的代码:

while any(x is not None for x in stdouts):
    for i in range(len(stdouts)):
        stdout = stdouts[i]
        if stdout is not None:
            channel = stdout.channel
            # To prevent losing output at the end, first test for exit,
            # then for output
            exited = channel.exit_status_ready()
            while channel.recv_ready():
                s = channel.recv(1024).decode('utf8')
                print(f"#{i} stdout: {s}")
            while channel.recv_stderr_ready():
                s = channel.recv_stderr(1024).decode('utf8')
                print(f"#{i} stderr: {s}")
            if exited:
                print(f"#{i} done")
                clients[i].close()
                stdouts[i] = None
    time.sleep(0.1)

如果不需要stdout和stderr的分离,使用Channel.set_combine_stderr. See .

可以大大简化代码

关于您关于 SSHClient.close 的问题:如果您不调用它,连接将在脚本完成时隐式关闭,当 Python 垃圾收集器清理挂起的对象时。这是一个不好的做法。即使 Python 不会这样做,本地 OS 也会终止本地 Python 进程的所有连接。这也是一种不好的做法。在任何情况下,这都会终止远程进程。