使用 SFTP 缓慢上传许多小文件

Slow upload of many small files with SFTP

用SFTP上传100个100字节的文件时,这里用了17秒(连接建立后,我连初始连接时间都不算)。这意味着仅传输 10 KB 需要 17 秒,即 0.59 KB/sec!

我知道向 openwriteclose 等发送 SSH 命令可能会产生很大的开销,但是 有办法吗在使用 SFTP 发送许多小文件时加快处理速度?

或者 paramiko / pysftp 中的一种特殊模式,将所有要执行的写入操作保存在内存缓冲区中(比如说最后 2 秒的所有操作),然后在SSH/SFTP 的一组传球?这样可以避免在每次操作之间等待 ping 时间。

注:

import pysftp, time, os
with pysftp.Connection('1.2.3.4', username='root', password='') as sftp:
    with sftp.cd('/tmp/'):
        t0 = time.time()
        for i in range(100):
            print(i)
            with sftp.open('test%i.txt' % i, 'wb') as f:   # even worse in a+ append mode: it takes 25 seconds
                f.write(os.urandom(100))
        print(time.time() - t0)

我建议您使用来自多个线程的多个连接来并行化上传。这是简单可靠的解决方案。


如果您想通过使用缓冲请求来解决问题,您可以将您的解决方案基于以下朴素示例。

例子:

  • 排队 100 个文件打开请求;
  • 当它读取对打开请求的响应时,它会将写入请求排队;
  • 当它读取对写入请求的响应时,它会将关闭请求排队

如果我对 100 个文件执行纯 SFTPClient.put,大约需要 10-12 秒。使用下面的代码,我的速度提高了 50-100 倍。

但是!代码是真的很幼稚:

  • 它期望服务器以相同的顺序响应请求。事实上,大多数 SFTP 服务器(包括事实上的标准 OpenSSH)以相同的顺序响应。但根据 SFTP 规范,SFTP 服务器可以自由响应任何顺序。
  • 代码期望所有文件读取一次性发生 – upload.localhandle.read(32*1024)。仅适用于小文件。
  • 代码预计 SFTP 服务器可以处理 100 个并行请求和 100 个打开的文件。对于大多数服务器来说这不是问题,因为它们按顺序处理请求。并且打开100个文件对于普通服务器来说应该不是问题。
  • 但是您不能对无限数量的文件执行此操作。您必须以某种方式对文件进行排队,以控制未完成请求的数量。其实这100个请求也太多了
  • 代码使用了 SFTPClient class 的非 public 方法。
  • 我不做Python。肯定有更优雅的编码方式。
import paramiko
import paramiko.sftp
from paramiko.py3compat import long
 
ssh = paramiko.SSHClient()
ssh.connect(...)
 
sftp = ssh.open_sftp()
                      
class Upload:
   def __init__(self):
       pass

uploads = []

for i in range(0, 100):
    print(f"sending open request {i}")
    upload = Upload()
    upload.i = i
    upload.localhandle = open(f"{i}.dat")
    upload.remotepath = f"/remote/path/{i}.dat"
    imode = \
        paramiko.sftp.SFTP_FLAG_CREATE | paramiko.sftp.SFTP_FLAG_TRUNC | \
        paramiko.sftp.SFTP_FLAG_WRITE
    attrblock = paramiko.SFTPAttributes()
    upload.request = \
        sftp._async_request(type(None), paramiko.sftp.CMD_OPEN, upload.remotepath, \
            imode, attrblock)
    uploads.append(upload)

for upload in uploads:
    print(f"reading open response {upload.i}");
    t, msg = sftp._read_response(upload.request)
    if t != paramiko.sftp.CMD_HANDLE:
        raise SFTPError("Expected handle")
    upload.handle = msg.get_binary()

    print(f"sending write request {upload.i} to handle {upload.handle}");
    data = upload.localhandle.read(32*1024)
    upload.request = \
        sftp._async_request(type(None), paramiko.sftp.CMD_WRITE, \
            upload.handle, long(0), data)

for upload in uploads:
    print(f"reading write response {upload.i} {upload.request}");
    t, msg = sftp._read_response(upload.request)
    if t != paramiko.sftp.CMD_STATUS:
        raise SFTPError("Expected status")
    print(f"closing {upload.i} {upload.handle}");
    upload.request = \
        sftp._async_request(type(None), paramiko.sftp.CMD_CLOSE, upload.handle)

for upload in uploads:
    print(f"reading close response {upload.i} {upload.request}");
    sftp._read_response(upload.request)

使用以下方法(100 个异步任务),它在 ~ 0.5 秒内完成,这是一个巨大的改进。

import asyncio, asyncssh  # pip install asyncssh
async def main():
    async with asyncssh.connect('1.2.3.4', username='root', password='') as conn:
        async with conn.start_sftp_client() as sftp:
            print('connected')
            await asyncio.wait([sftp.put('files/test%i.txt' % i) for i in range(100)])
asyncio.run(main())

我会探索来源,但我仍然不知道它是否将许多操作分组在少数 SSH 事务中,或者它是否只是并行运行命令。