另一个 sftp 到 s3 文件上传问题

Yet another sftp to s3 file uploading question

最近我遇到了通过SFTP 将大文件(> 0.5 gb) 传输到S3 存储的任务。正如我从 boto3 手册中了解到的那样,我应该使用分段上传。我遇到了一个很好的 tutorial 并做了一些小改动,自己尝试了代码。所以,长话短说 - 它有效,但速度很荒谬(~150 kb/s),这在我的情况下(大文件)尤其有害。据我所知,paramiko 可能是一个瓶颈,但我对预取和不同缓冲区大小的实验并没有多大意义(我得到的最大结果是第一个块(预取打开)大约 1500 kb/s 然后对于 ~1.5 gb 文件上的其他块,下拉到 250-300 kb/s)。因此,由于我 运行 没有想法,因此非常感谢任何有关下一步尝试的提示和想法,在此先感谢!这是我的代码:

AWS 会话包装器

aws.py

import boto3
import os

def aws_session(region_name='us-east-1'):
    AWS_ACCESS_KEY_ID = os.environ.get('AWS_ACCESS_KEY_ID')
    AWS_ACCESS_KEY_SECRET = os.environ.get('AWS_ACCESS_KEY_SECRET')
    return boto3.session.Session(aws_access_key_id=AWS_ACCESS_KEY_ID,
                                 aws_secret_access_key=AWS_ACCESS_KEY_SECRET,
                                 region_name=region_name)

主脚本

stream.py

import paramiko
import math
import time
from aws import aws_session

from dotenv import load_dotenv

def open_sftp_connection(sftp_host, sftp_port, sftp_username, sftp_password):
    client = paramiko.SSHClient()
    client.load_system_host_keys()
    try:
        transport = paramiko.Transport(sftp_host, sftp_port)
    except Exception as e:
        return 'conn_error'
    try:
        transport.connect(username=sftp_username, password=sftp_password)
    except Exception as identifier:
        return 'auth_error'
    ftp_connection = paramiko.SFTPClient.from_transport(transport)
    return ftp_connection


def transfer_chunk_from_sftp_to_s3(sftp_file, s3_connection, multipart_upload, bucket_name,
                                  s3_file_path, part_number, chunk_size):
    start_time = time.time()
    chunk = sftp_file.read(int(chunk_size))
    part = s3_connection.upload_part(
        Bucket=bucket_name,
        Key=s3_file_path,
        PartNumber=part_number,
        UploadId=multipart_upload["UploadId"],
        Body=chunk)
    end_time = time.time()
    total_seconds = end_time - start_time
    print(
        "speed is {} kb/s total seconds taken {}".format(
           math.ceil((int(chunk_size) / 1024) / total_seconds), total_seconds
        )
    )
    part_output = {"PartNumber": part_number, "ETag": part["ETag"]}
    return part_output


def transfer_file_from_sftp_to_s3(bucket_name, sftp_host, sftp_port, sftp_path, 
                                  aws_s3_path, sftp_username, sftp_password, chunk_size=26214400):
    
    sftp_connection = open_sftp_connection(sftp_host, int(sftp_port), sftp_username, sftp_password)
    sftp_file = sftp_connection.file(sftp_path, "r", bufsize=-1)    
    s3_connection = aws_session().client('s3')
    sftp_file_size = sftp_file._get_size()
    print('file size: ', sftp_file_size)
    chunk_count = int(math.ceil(sftp_file_size / float(chunk_size)))
    print('amount of chunks: ', chunk_count)
    multipart_upload = s3_connection.create_multipart_upload(Bucket=bucket_name, Key=aws_s3_path)
    parts = []
    for i in range(chunk_count):
        print("Transferring chunk {}...".format(i + 1))
        part = transfer_chunk_from_sftp_to_s3(
                  sftp_file,
                  s3_connection,
                  multipart_upload,
                  bucket_name,
                  aws_s3_path,
                  i + 1,
                  chunk_size,
                )
        parts.append(part)
        print("Chunk {} Transferred Successfully!".format(i + 1))

    part_info = {"Parts": parts}
    s3_connection.complete_multipart_upload(
        Bucket=bucket_name,
        Key=aws_s3_path,
        UploadId=multipart_upload["UploadId"],
        MultipartUpload=part_info,
    )
    sftp_file.close()

if __name__ == '__main__':

    load_dotenv()

    bucket_name=''
    sftp_host=''
    sftp_port=int('22')
    sftp_path=''
    aws_s3_path=''
    sftp_username=''
    sftp_password=''


    transfer_file_from_sftp_to_s3(
        bucket_name=bucket_name,
        sftp_host=sftp_host,
        sftp_port=22,
        sftp_path=sftp_path,
        aws_s3_path=aws_s3_path,
        sftp_username=sftp_username,
        sftp_password=sftp_password
    )

PS:我使用此脚本的场景如下:我需要接受传入的大文件(来自任何地方)以通过管道处理它们,但处理管道是隔离的,因此无法从外部世界(所有涉及数据的通信都是通过 S3 存储完成的)。我采用的解决方案方案是中间的一台小型机器,它可以接受 sftp 凭据并将文件“流”(或逐块上传)文件到 S3 存储,管道将从那里捕获它并完成工作。

这里存在三个问题:

您将下载和上传块的时间计为一次传输,因此如果下载 100 秒,上传 1 mb 需要 10 秒,您报告 10kb/s(1mb/110 秒)而不是 11kb/s 后跟 103kb/s。我提到这个是因为它隐藏了第二个问题。

Paramiko 对某些人的持续传输存在一些问题。 This bug on paramiko 有更多详细信息。

而且,除了这些问题之外,您还传输了一大块,然后转身上传。在大多数环境中,这是一种浪费,即使运行良好,这通常意味着您的下载花费了 1/2 的时间等待上传。

您可以解决所有问题:

import paramiko
import boto3
import multiprocessing
import time

# Simple helper to track an activity and report the duration
# and how fast data was transferred
class SpeedTracker:
    def __init__(self):
        self._start = time.time()
        self._end = None
    def start(self):
        self._start = time.time()
    def end(self, bytes, desc):
        self._end = time.time()
        secs = self._end - self._start
        if secs > 0:
            print(f"{desc} done, {(bytes / 1048576) / secs:0.3f} MB/s")

class FastTransport(paramiko.Transport):
    # Correct issues with window size, see paramiko issue 175
    def __init__(self, sock):
        super(FastTransport, self).__init__(sock)
        self.window_size = 2147483647
        self.packetizer.REKEY_BYTES = pow(2, 40)
        self.packetizer.REKEY_PACKETS = pow(2, 40)

def open_sftp_connection(sftp_host, sftp_port, sftp_username, sftp_password, sftp_key):
    client = paramiko.SSHClient()
    client.load_system_host_keys()
    transport = FastTransport((sftp_host, sftp_port))
    # Not necessary, but here for testing purposes, support either
    # password or private key auth
    if sftp_password is not None:
        transport.connect(username=sftp_username, password=sftp_password)
    else:
        pkey = paramiko.RSAKey.from_private_key_file(sftp_key)
        transport.connect(username=sftp_username, pkey=pkey)
    ftp_connection = paramiko.SFTPClient.from_transport(transport)
    return ftp_connection

def pull_from_sftp(sftp_host, sftp_port, sftp_username, sftp_password, sftp_key, sftp_path, queue):
    sftp_connection = open_sftp_connection(sftp_host, int(sftp_port), sftp_username, sftp_password, sftp_key)
    sftp_file = sftp_connection.file(sftp_path, "rb")
    # Enable pipelined mode, see paramiko issue 175
    sftp_file.set_pipelined()
    # Allow the transfer to fill up data in a background thread
    sftp_file.prefetch()
    chunk_size = 8388608
    tracker = SpeedTracker()
    num = 0
    while True:
        # Download one chunk
        tracker.start()
        chunk = sftp_file.read(chunk_size)
        if len(chunk) == 0:
            # All done, time to stop work
            queue.put(None)
            sftp_file.close()
            break
        # Send the chunk off to the reader process
        num += 1
        tracker.end(len(chunk), f"Downloaded chunk #{num}")
        queue.put(chunk)

def send_chunk_to_s3(s3_connection, multipart_upload, bucket_name,
                                  s3_file_path, part_number, chunk):
    # Upload one chunk to S3
    tracker = SpeedTracker()
    part = s3_connection.upload_part(
        Bucket=bucket_name,
        Key=s3_file_path,
        PartNumber=part_number,
        UploadId=multipart_upload["UploadId"],
        Body=chunk)
    tracker.end(len(chunk), f"Uploaded chunk #{part_number}")
    part_output = {"PartNumber": part_number, "ETag": part["ETag"]}
    return part_output

def transfer_file_from_sftp_to_s3(bucket_name, sftp_host, sftp_port, sftp_path, 
                                  aws_s3_path, sftp_username, sftp_password, sftp_key):
    # Start a worker process to get the data from SFTP
    queue = multiprocessing.Queue(10)
    proc = multiprocessing.Process(target=pull_from_sftp, args=(sftp_host, sftp_port, sftp_username, sftp_password, sftp_key, sftp_path, queue))
    proc.start()

    # And start reading from that worker to upload its results
    s3_connection = boto3.client('s3')
    multipart_upload = s3_connection.create_multipart_upload(Bucket=bucket_name, Key=aws_s3_path)
    parts = []
    while True:
        chunk = queue.get()
        if chunk is None:
            break
        part = send_chunk_to_s3(
            s3_connection,
            multipart_upload,
            bucket_name,
            aws_s3_path,
            len(parts) + 1,
            chunk,
        )
        parts.append(part)

    # All done, clean up and finalize the multipart upload
    proc.join()
    part_info = {"Parts": parts}
    resp = s3_connection.complete_multipart_upload(
        Bucket=bucket_name,
        Key=aws_s3_path,
        UploadId=multipart_upload["UploadId"],
        MultipartUpload=part_info,
    )
    print(resp)

if __name__ == '__main__':
    sftp_host='TODO'
    sftp_port=int('22')
    sftp_path='TODO'
    sftp_username='TODO'
    sftp_password='TODO'
    sftp_key=None
    bucket_name='TODO'
    aws_s3_path='TODO'

    transfer_file_from_sftp_to_s3(
        bucket_name=bucket_name,
        sftp_host=sftp_host,
        sftp_port=22,
        sftp_path=sftp_path,
        aws_s3_path=aws_s3_path,
        sftp_username=sftp_username,
        sftp_password=sftp_password,
        sftp_key=sftp_key
    )

在我的本地机器上测试,我看到下载和上传速度大约为 40 MB/s,如果没有错误修复,这肯定更好。