使用 Python asyncio 同时通过 SSH 和 Ping 连接到主机?
SSH and Ping to hosts concurrently with Python asyncio?
我正在尝试 SSH/Ping 同时连接到主机,但我没有看到任何结果,可能是我的实施不正确。这是我到目前为止所拥有的。任何想法表示赞赏。
import paramiko
import time
import asyncio
import subprocess
async def sshTest(ipaddress,deviceUsername,devicePassword,sshPort): #finalDict
try:
print("Performing SSH Connection to the device")
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(ipaddress, username=deviceUsername, password=devicePassword, port=sshPort, look_for_keys=False, allow_agent=False)
print("Channel established")
except Exception as e:
print(e)
async def pingf(ip):
p1 = subprocess.Popen(['ping', '-c','5', ip], stdout=subprocess.PIPE)
output = p1.communicate()[0]
print(output)
async def main():
taskA = loop.create_task(sshTest('192.168.255.68','admin','admin','22'))
taskB = loop.create_task(sshTest('192.168.254.108','admin','admin','22'))
taskC = loop.create_task(sshTest('192.168.249.134','admin','admin','22'))
taskD = loop.create_task(sshTest('192.168.254.108','admin','admin','22'))
task1 = loop.create_task(pingf('192.168.255.68'))
task2 = loop.create_task(pingf('192.168.254.108'))
task3 = loop.create_task(pingf('192.168.249.134'))
task4 = loop.create_task(pingf('192.168.254.108'))
await asyncio.wait([taskA,taskB,taskC,taskD,task1,task2,task3,task4])
if __name__ == "__main__":
start = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
end = time.time()
print("The time of execution of above program is :", end-start)
Asyncio 是 cooperative multitasking 的一种形式。这意味着为了让任务 运行 并发,任务必须 明确地将控制权 交还给调度程序,这在 Python 中意味着“你的任务需要 await
在某事上。
您的任务都不会调用 await
,因此它们不会同时调用 运行。你现在拥有的是运行连续
如果你想同时 运行 ssh
个连接,你将不得不:
- 将
paramiko
替换为 AsyncSSH 之类的东西,它是为与 asyncio
一起编写的,或者
- 使用线程或多处理来并行化您的任务,而不是使用
asyncio
。
此外,如果您正在使用 asyncio
,任何涉及 运行 外部命令的东西(例如您的 pingf
任务)都需要使用 asyncio
的run_in_executor
方法。
对于您在此处显示的示例,我建议改为使用 concurrent.futures
模块。您的代码最终可能看起来像这样(我在我的测试环境中将代码修改为 运行 并为 sshTest
任务提供了一些不仅仅是连接的任务):
import concurrent.futures
import paramiko
import asyncio
import subprocess
def sshTest(ipaddress, deviceUsername, devicePassword, sshPort): # finalDict
try:
print("Performing SSH Connection to the device")
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(
ipaddress,
username=deviceUsername,
password=devicePassword,
port=sshPort,
look_for_keys=True,
allow_agent=True,
)
stdin, stdout, stderr = client.exec_command("sh -c 'sleep 2; uptime'")
output = stdout.read()
return output
except Exception:
return "failed to connect"
def pingf(ip):
output = subprocess.check_output(["ping", "-c", "5", ip])
return output
def main():
futures = []
with concurrent.futures.ThreadPoolExecutor() as pool:
futures.append(pool.submit(sshTest, "localhost", "root", "admin", "2200"))
futures.append(pool.submit(sshTest, "localhost", "root", "admin", "2201"))
futures.append(pool.submit(sshTest, "localhost", "root", "admin", "2202"))
futures.append(pool.submit(pingf, "192.168.1.1"))
futures.append(pool.submit(pingf, "192.168.1.5"))
futures.append(pool.submit(pingf, "192.168.1.254"))
for future in concurrent.futures.as_completed(futures):
print("return value from task:", future.result())
if __name__ == "__main__":
main()
我正在尝试 SSH/Ping 同时连接到主机,但我没有看到任何结果,可能是我的实施不正确。这是我到目前为止所拥有的。任何想法表示赞赏。
import paramiko
import time
import asyncio
import subprocess
async def sshTest(ipaddress,deviceUsername,devicePassword,sshPort): #finalDict
try:
print("Performing SSH Connection to the device")
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(ipaddress, username=deviceUsername, password=devicePassword, port=sshPort, look_for_keys=False, allow_agent=False)
print("Channel established")
except Exception as e:
print(e)
async def pingf(ip):
p1 = subprocess.Popen(['ping', '-c','5', ip], stdout=subprocess.PIPE)
output = p1.communicate()[0]
print(output)
async def main():
taskA = loop.create_task(sshTest('192.168.255.68','admin','admin','22'))
taskB = loop.create_task(sshTest('192.168.254.108','admin','admin','22'))
taskC = loop.create_task(sshTest('192.168.249.134','admin','admin','22'))
taskD = loop.create_task(sshTest('192.168.254.108','admin','admin','22'))
task1 = loop.create_task(pingf('192.168.255.68'))
task2 = loop.create_task(pingf('192.168.254.108'))
task3 = loop.create_task(pingf('192.168.249.134'))
task4 = loop.create_task(pingf('192.168.254.108'))
await asyncio.wait([taskA,taskB,taskC,taskD,task1,task2,task3,task4])
if __name__ == "__main__":
start = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
end = time.time()
print("The time of execution of above program is :", end-start)
Asyncio 是 cooperative multitasking 的一种形式。这意味着为了让任务 运行 并发,任务必须 明确地将控制权 交还给调度程序,这在 Python 中意味着“你的任务需要 await
在某事上。
您的任务都不会调用 await
,因此它们不会同时调用 运行。你现在拥有的是运行连续
如果你想同时 运行 ssh
个连接,你将不得不:
- 将
paramiko
替换为 AsyncSSH 之类的东西,它是为与asyncio
一起编写的,或者 - 使用线程或多处理来并行化您的任务,而不是使用
asyncio
。
此外,如果您正在使用 asyncio
,任何涉及 运行 外部命令的东西(例如您的 pingf
任务)都需要使用 asyncio
的run_in_executor
方法。
对于您在此处显示的示例,我建议改为使用 concurrent.futures
模块。您的代码最终可能看起来像这样(我在我的测试环境中将代码修改为 运行 并为 sshTest
任务提供了一些不仅仅是连接的任务):
import concurrent.futures
import paramiko
import asyncio
import subprocess
def sshTest(ipaddress, deviceUsername, devicePassword, sshPort): # finalDict
try:
print("Performing SSH Connection to the device")
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(
ipaddress,
username=deviceUsername,
password=devicePassword,
port=sshPort,
look_for_keys=True,
allow_agent=True,
)
stdin, stdout, stderr = client.exec_command("sh -c 'sleep 2; uptime'")
output = stdout.read()
return output
except Exception:
return "failed to connect"
def pingf(ip):
output = subprocess.check_output(["ping", "-c", "5", ip])
return output
def main():
futures = []
with concurrent.futures.ThreadPoolExecutor() as pool:
futures.append(pool.submit(sshTest, "localhost", "root", "admin", "2200"))
futures.append(pool.submit(sshTest, "localhost", "root", "admin", "2201"))
futures.append(pool.submit(sshTest, "localhost", "root", "admin", "2202"))
futures.append(pool.submit(pingf, "192.168.1.1"))
futures.append(pool.submit(pingf, "192.168.1.5"))
futures.append(pool.submit(pingf, "192.168.1.254"))
for future in concurrent.futures.as_completed(futures):
print("return value from task:", future.result())
if __name__ == "__main__":
main()