通过 SFTP 将一个文件并行复制到多个远程主机

Copying one file to multiple remote hosts in parallel over SFTP

我想使用 Python 将本地文件并行复制到多个远程主机。我正在尝试使用 asyncio 和 Paramiko 来做到这一点,因为我已经在我的程序中将这些库用于其他目的。

我正在使用 BaseEventLoop.run_in_executor() 和默认 ThreadPoolExecutor,这实际上是旧 threading 库的新接口,以及用于复制的 Paramiko 的 SFTP 功能。

这是一个简单的例子。

import sys
import asyncio
import paramiko
import functools


def copy_file_node(
        *,
        user: str,
        host: str,
        identity_file: str,
        local_path: str,
        remote_path: str):
    ssh_client = paramiko.client.SSHClient()
    ssh_client.load_system_host_keys()
    ssh_client.set_missing_host_key_policy(paramiko.client.AutoAddPolicy())

    ssh_client.connect(
        username=user,
        hostname=host,
        key_filename=identity_file,
        timeout=3)

    with ssh_client:
        with ssh_client.open_sftp() as sftp:
            print("[{h}] Copying file...".format(h=host))
            sftp.put(localpath=local_path, remotepath=remote_path)
            print("[{h}] Copy complete.".format(h=host))


loop = asyncio.get_event_loop()

tasks = []

# NOTE: You'll have to update the values being passed in to
#      `functools.partial(copy_file_node, ...)`
#       to get this working on on your machine.
for host in ['10.0.0.1', '10.0.0.2']:
    task = loop.run_in_executor(
        None,
        functools.partial(
            copy_file_node,
            user='user',
            host=host,
            identity_file='/path/to/identity_file',
            local_path='/path/to/local/file',
            remote_path='/path/to/remote/file'))
    tasks.append(task)

try:
    loop.run_until_complete(asyncio.gather(*tasks))
except Exception as e:
    print("At least one node raised an error:", e, file=sys.stderr)
    sys.exit(1)

loop.close()

我看到的问题是文件被串行而不是并行地复制到主机。所以如果单台主机复制需要5秒,两台主机需要10秒,以此类推。

我尝试了各种其他方法,包括放弃 SFTP 并通过 exec_command() 将文件通过管道传输到每个远程主机上的 dd,但副本总是按顺序进行。

我可能误解了这里的一些基本概念。是什么阻止了不同的线程并行复制文件?

根据我的测试,似乎是在远程写入时发生了阻塞,而不是在读取本地文件时发生。但为什么会这样,因为我们正在尝试针对独立的远程主机进行网络 I/O?

我不确定这是处理它的最佳方式,但它对我有用

#start
from multiprocessing import Process

#omitted

tasks = []
for host in hosts:
    p = Process(
        None,
        functools.partial(
          copy_file_node,
          user=user,
          host=host,
          identity_file=identity_file,
          local_path=local_path,
          remote_path=remote_path))

    tasks.append(p)

[t.start() for t in tasks]
[t.join() for t in tasks]

根据评论,添加了日期戳并捕获了多处理的输出并得到了这个:

2015-10-24 03:06:08.749683[vagrant1] Copying file...
2015-10-24 03:06:08.751826[basement] Copying file...
2015-10-24 03:06:08.757040[upstairs] Copying file...
2015-10-24 03:06:16.222416[vagrant1] Copy complete.
2015-10-24 03:06:18.094373[upstairs] Copy complete.
2015-10-24 03:06:22.478711[basement] Copy complete.

你对asyncio的使用没有问题。

为了证明这一点,让我们试试您的脚本的简化版本 - 没有 paramiko,只是纯粹的 Python。

import asyncio, functools, sys, time

START_TIME = time.monotonic()

def log(msg):
    print('{:>7.3f} {}'.format(time.monotonic() - START_TIME, msg))

def dummy(thread_id):
    log('Thread {} started'.format(thread_id))
    time.sleep(1)
    log('Thread {} finished'.format(thread_id))

loop = asyncio.get_event_loop()
tasks = []
for i in range(0, int(sys.argv[1])):
    task = loop.run_in_executor(None, functools.partial(dummy, thread_id=i))
    tasks.append(task)
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

有两个线程,这将打印:

$ python3 async.py 2
  0.001 Thread 0 started
  0.002 Thread 1 started       <-- 2 tasks are executed concurrently
  1.003 Thread 0 finished
  1.003 Thread 1 finished      <-- Total time is 1 second

此并发扩展到 5 个线程:

$ python3 async.py 5
  0.001 Thread 0 started
  ...
  0.003 Thread 4 started       <-- 5 tasks are executed concurrently
  1.002 Thread 0 finished
  ...
  1.005 Thread 4 finished      <-- Total time is still 1 second

如果再添加一个线程,就会达到线程池限制:

$ python3 async.py 6
  0.001 Thread 0 started
  0.001 Thread 1 started
  0.002 Thread 2 started
  0.003 Thread 3 started
  0.003 Thread 4 started       <-- 5 tasks are executed concurrently
  1.002 Thread 0 finished
  1.003 Thread 5 started       <-- 6th task is executed after 1 second
  1.003 Thread 1 finished
  1.004 Thread 2 finished
  1.004 Thread 3 finished
  1.004 Thread 4 finished      <-- 5 task are completed after 1 second
  2.005 Thread 5 finished      <-- 6th task is completed after 2 seconds

一切如预期,总时间每5个项目增加1秒。幻数 5 记录在 ThreadPoolExecutor 文档中:

Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.

第三方库如何阻止我的 ThreadPoolExecutor?

  • 库使用了某种全局锁。这意味着该库不支持多线程。尝试使用 ProcessPoolExecutor,但要小心:库可能包含其他反模式,例如使用相同的硬编码临时文件名。

  • 函数执行时间长,不释放GIL。它可能表明 C 扩展代码中存在错误,但持有 GIL 的最常见原因是进行一些 CPU 密集型计算。同样,您可以尝试 ProcessPoolExecutor,因为它不受 GIL 的影响。

None 这些预计会发生在像 paramiko 这样的库中。

第三方库如何阻止我的 ProcessPoolExecutor?

通常不能。您的任务在单独的进程中执行。如果你看到 ProcessPoolExecutor 中的两个任务花费了两倍的时间,怀疑是资源瓶颈(比如消耗了 100% 的网络带宽)。