使用 Python asyncio 同时通过 SSH 和 Ping 连接到主机?

SSH and Ping to hosts concurrently with Python asyncio?

我正在尝试 SSH/Ping 同时连接到主机,但我没有看到任何结果,可能是我的实施不正确。这是我到目前为止所拥有的。任何想法表示赞赏。

import paramiko
import time
import asyncio
import subprocess

async def sshTest(ipaddress,deviceUsername,devicePassword,sshPort): #finalDict
    try:
            print("Performing SSH Connection to the device")

            client = paramiko.SSHClient()
            client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

            client.connect(ipaddress, username=deviceUsername, password=devicePassword, port=sshPort, look_for_keys=False, allow_agent=False)
            print("Channel established")     

    except Exception as e:
        print(e)       

async def pingf(ip):
    p1 = subprocess.Popen(['ping', '-c','5', ip], stdout=subprocess.PIPE)
    output = p1.communicate()[0]
    print(output)

async def main():
    taskA = loop.create_task(sshTest('192.168.255.68','admin','admin','22'))
    taskB = loop.create_task(sshTest('192.168.254.108','admin','admin','22'))
    taskC = loop.create_task(sshTest('192.168.249.134','admin','admin','22'))
    taskD = loop.create_task(sshTest('192.168.254.108','admin','admin','22'))
    task1 = loop.create_task(pingf('192.168.255.68'))
    task2 = loop.create_task(pingf('192.168.254.108'))
    task3 = loop.create_task(pingf('192.168.249.134'))
    task4 = loop.create_task(pingf('192.168.254.108'))
    await asyncio.wait([taskA,taskB,taskC,taskD,task1,task2,task3,task4]) 
    
if __name__ == "__main__":
    start = time.time()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    end = time.time()
    print("The time of execution of above program is :", end-start)
    

Asyncio 是 cooperative multitasking 的一种形式。这意味着为了让任务 运行 并发,任务必须 明确地将控制权 交还给调度程序,这在 Python 中意味着“你的任务需要 await 在某事上。

您的任务都不会调用 await,因此它们不会同时调用 运行。你现在拥有的是运行连续

如果你想同时 运行 ssh 个连接,你将不得不:

  • paramiko 替换为 AsyncSSH 之类的东西,它是为与 asyncio 一起编写的,或者
  • 使用线程或多处理来并行化您的任务,而不是使用 asyncio

此外,如果您正在使用 asyncio,任何涉及 运行 外部命令的东西(例如您的 pingf 任务)都需要使用 asynciorun_in_executor方法。


对于您在此处显示的示例,我建议改为使用 concurrent.futures 模块。您的代码最终可能看起来像这样(我在我的测试环境中将代码修改为 运行 并为 sshTest 任务提供了一些不仅仅是连接的任务):

import concurrent.futures

import paramiko
import asyncio
import subprocess


def sshTest(ipaddress, deviceUsername, devicePassword, sshPort):  # finalDict
    try:
        print("Performing SSH Connection to the device")

        client = paramiko.SSHClient()
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

        client.connect(
            ipaddress,
            username=deviceUsername,
            password=devicePassword,
            port=sshPort,
            look_for_keys=True,
            allow_agent=True,
        )
        stdin, stdout, stderr = client.exec_command("sh -c 'sleep 2; uptime'")
        output = stdout.read()
        return output
    except Exception:
        return "failed to connect"


def pingf(ip):
    output = subprocess.check_output(["ping", "-c", "5", ip])
    return output


def main():
    futures = []
    with concurrent.futures.ThreadPoolExecutor() as pool:
        futures.append(pool.submit(sshTest, "localhost", "root", "admin", "2200"))
        futures.append(pool.submit(sshTest, "localhost", "root", "admin", "2201"))
        futures.append(pool.submit(sshTest, "localhost", "root", "admin", "2202"))
        futures.append(pool.submit(pingf, "192.168.1.1"))
        futures.append(pool.submit(pingf, "192.168.1.5"))
        futures.append(pool.submit(pingf, "192.168.1.254"))

    for future in concurrent.futures.as_completed(futures):
        print("return value from task:", future.result())


if __name__ == "__main__":
    main()