另一个 sftp 到 s3 文件上传问题
Yet another sftp to s3 file uploading question
最近我遇到了通过SFTP 将大文件(> 0.5 gb) 传输到S3 存储的任务。正如我从 boto3 手册中了解到的那样,我应该使用分段上传。我遇到了一个很好的 tutorial 并做了一些小改动,自己尝试了代码。所以,长话短说 - 它有效,但速度很荒谬(~150 kb/s),这在我的情况下(大文件)尤其有害。据我所知,paramiko 可能是一个瓶颈,但我对预取和不同缓冲区大小的实验并没有多大意义(我得到的最大结果是第一个块(预取打开)大约 1500 kb/s 然后对于 ~1.5 gb 文件上的其他块,下拉到 250-300 kb/s)。因此,由于我 运行 没有想法,因此非常感谢任何有关下一步尝试的提示和想法,在此先感谢!这是我的代码:
AWS 会话包装器
aws.py
import boto3
import os
def aws_session(region_name='us-east-1'):
AWS_ACCESS_KEY_ID = os.environ.get('AWS_ACCESS_KEY_ID')
AWS_ACCESS_KEY_SECRET = os.environ.get('AWS_ACCESS_KEY_SECRET')
return boto3.session.Session(aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_ACCESS_KEY_SECRET,
region_name=region_name)
主脚本
stream.py
import paramiko
import math
import time
from aws import aws_session
from dotenv import load_dotenv
def open_sftp_connection(sftp_host, sftp_port, sftp_username, sftp_password):
client = paramiko.SSHClient()
client.load_system_host_keys()
try:
transport = paramiko.Transport(sftp_host, sftp_port)
except Exception as e:
return 'conn_error'
try:
transport.connect(username=sftp_username, password=sftp_password)
except Exception as identifier:
return 'auth_error'
ftp_connection = paramiko.SFTPClient.from_transport(transport)
return ftp_connection
def transfer_chunk_from_sftp_to_s3(sftp_file, s3_connection, multipart_upload, bucket_name,
s3_file_path, part_number, chunk_size):
start_time = time.time()
chunk = sftp_file.read(int(chunk_size))
part = s3_connection.upload_part(
Bucket=bucket_name,
Key=s3_file_path,
PartNumber=part_number,
UploadId=multipart_upload["UploadId"],
Body=chunk)
end_time = time.time()
total_seconds = end_time - start_time
print(
"speed is {} kb/s total seconds taken {}".format(
math.ceil((int(chunk_size) / 1024) / total_seconds), total_seconds
)
)
part_output = {"PartNumber": part_number, "ETag": part["ETag"]}
return part_output
def transfer_file_from_sftp_to_s3(bucket_name, sftp_host, sftp_port, sftp_path,
aws_s3_path, sftp_username, sftp_password, chunk_size=26214400):
sftp_connection = open_sftp_connection(sftp_host, int(sftp_port), sftp_username, sftp_password)
sftp_file = sftp_connection.file(sftp_path, "r", bufsize=-1)
s3_connection = aws_session().client('s3')
sftp_file_size = sftp_file._get_size()
print('file size: ', sftp_file_size)
chunk_count = int(math.ceil(sftp_file_size / float(chunk_size)))
print('amount of chunks: ', chunk_count)
multipart_upload = s3_connection.create_multipart_upload(Bucket=bucket_name, Key=aws_s3_path)
parts = []
for i in range(chunk_count):
print("Transferring chunk {}...".format(i + 1))
part = transfer_chunk_from_sftp_to_s3(
sftp_file,
s3_connection,
multipart_upload,
bucket_name,
aws_s3_path,
i + 1,
chunk_size,
)
parts.append(part)
print("Chunk {} Transferred Successfully!".format(i + 1))
part_info = {"Parts": parts}
s3_connection.complete_multipart_upload(
Bucket=bucket_name,
Key=aws_s3_path,
UploadId=multipart_upload["UploadId"],
MultipartUpload=part_info,
)
sftp_file.close()
if __name__ == '__main__':
load_dotenv()
bucket_name=''
sftp_host=''
sftp_port=int('22')
sftp_path=''
aws_s3_path=''
sftp_username=''
sftp_password=''
transfer_file_from_sftp_to_s3(
bucket_name=bucket_name,
sftp_host=sftp_host,
sftp_port=22,
sftp_path=sftp_path,
aws_s3_path=aws_s3_path,
sftp_username=sftp_username,
sftp_password=sftp_password
)
PS:我使用此脚本的场景如下:我需要接受传入的大文件(来自任何地方)以通过管道处理它们,但处理管道是隔离的,因此无法从外部世界(所有涉及数据的通信都是通过 S3 存储完成的)。我采用的解决方案方案是中间的一台小型机器,它可以接受 sftp 凭据并将文件“流”(或逐块上传)文件到 S3 存储,管道将从那里捕获它并完成工作。
这里存在三个问题:
您将下载和上传块的时间计为一次传输,因此如果下载 100 秒,上传 1 mb 需要 10 秒,您报告 10kb/s(1mb/110 秒)而不是 11kb/s 后跟 103kb/s。我提到这个是因为它隐藏了第二个问题。
Paramiko 对某些人的持续传输存在一些问题。 This bug on paramiko 有更多详细信息。
而且,除了这些问题之外,您还传输了一大块,然后转身上传。在大多数环境中,这是一种浪费,即使运行良好,这通常意味着您的下载花费了 1/2 的时间等待上传。
您可以解决所有问题:
import paramiko
import boto3
import multiprocessing
import time
# Simple helper to track an activity and report the duration
# and how fast data was transferred
class SpeedTracker:
def __init__(self):
self._start = time.time()
self._end = None
def start(self):
self._start = time.time()
def end(self, bytes, desc):
self._end = time.time()
secs = self._end - self._start
if secs > 0:
print(f"{desc} done, {(bytes / 1048576) / secs:0.3f} MB/s")
class FastTransport(paramiko.Transport):
# Correct issues with window size, see paramiko issue 175
def __init__(self, sock):
super(FastTransport, self).__init__(sock)
self.window_size = 2147483647
self.packetizer.REKEY_BYTES = pow(2, 40)
self.packetizer.REKEY_PACKETS = pow(2, 40)
def open_sftp_connection(sftp_host, sftp_port, sftp_username, sftp_password, sftp_key):
client = paramiko.SSHClient()
client.load_system_host_keys()
transport = FastTransport((sftp_host, sftp_port))
# Not necessary, but here for testing purposes, support either
# password or private key auth
if sftp_password is not None:
transport.connect(username=sftp_username, password=sftp_password)
else:
pkey = paramiko.RSAKey.from_private_key_file(sftp_key)
transport.connect(username=sftp_username, pkey=pkey)
ftp_connection = paramiko.SFTPClient.from_transport(transport)
return ftp_connection
def pull_from_sftp(sftp_host, sftp_port, sftp_username, sftp_password, sftp_key, sftp_path, queue):
sftp_connection = open_sftp_connection(sftp_host, int(sftp_port), sftp_username, sftp_password, sftp_key)
sftp_file = sftp_connection.file(sftp_path, "rb")
# Enable pipelined mode, see paramiko issue 175
sftp_file.set_pipelined()
# Allow the transfer to fill up data in a background thread
sftp_file.prefetch()
chunk_size = 8388608
tracker = SpeedTracker()
num = 0
while True:
# Download one chunk
tracker.start()
chunk = sftp_file.read(chunk_size)
if len(chunk) == 0:
# All done, time to stop work
queue.put(None)
sftp_file.close()
break
# Send the chunk off to the reader process
num += 1
tracker.end(len(chunk), f"Downloaded chunk #{num}")
queue.put(chunk)
def send_chunk_to_s3(s3_connection, multipart_upload, bucket_name,
s3_file_path, part_number, chunk):
# Upload one chunk to S3
tracker = SpeedTracker()
part = s3_connection.upload_part(
Bucket=bucket_name,
Key=s3_file_path,
PartNumber=part_number,
UploadId=multipart_upload["UploadId"],
Body=chunk)
tracker.end(len(chunk), f"Uploaded chunk #{part_number}")
part_output = {"PartNumber": part_number, "ETag": part["ETag"]}
return part_output
def transfer_file_from_sftp_to_s3(bucket_name, sftp_host, sftp_port, sftp_path,
aws_s3_path, sftp_username, sftp_password, sftp_key):
# Start a worker process to get the data from SFTP
queue = multiprocessing.Queue(10)
proc = multiprocessing.Process(target=pull_from_sftp, args=(sftp_host, sftp_port, sftp_username, sftp_password, sftp_key, sftp_path, queue))
proc.start()
# And start reading from that worker to upload its results
s3_connection = boto3.client('s3')
multipart_upload = s3_connection.create_multipart_upload(Bucket=bucket_name, Key=aws_s3_path)
parts = []
while True:
chunk = queue.get()
if chunk is None:
break
part = send_chunk_to_s3(
s3_connection,
multipart_upload,
bucket_name,
aws_s3_path,
len(parts) + 1,
chunk,
)
parts.append(part)
# All done, clean up and finalize the multipart upload
proc.join()
part_info = {"Parts": parts}
resp = s3_connection.complete_multipart_upload(
Bucket=bucket_name,
Key=aws_s3_path,
UploadId=multipart_upload["UploadId"],
MultipartUpload=part_info,
)
print(resp)
if __name__ == '__main__':
sftp_host='TODO'
sftp_port=int('22')
sftp_path='TODO'
sftp_username='TODO'
sftp_password='TODO'
sftp_key=None
bucket_name='TODO'
aws_s3_path='TODO'
transfer_file_from_sftp_to_s3(
bucket_name=bucket_name,
sftp_host=sftp_host,
sftp_port=22,
sftp_path=sftp_path,
aws_s3_path=aws_s3_path,
sftp_username=sftp_username,
sftp_password=sftp_password,
sftp_key=sftp_key
)
在我的本地机器上测试,我看到下载和上传速度大约为 40 MB/s,如果没有错误修复,这肯定更好。
最近我遇到了通过SFTP 将大文件(> 0.5 gb) 传输到S3 存储的任务。正如我从 boto3 手册中了解到的那样,我应该使用分段上传。我遇到了一个很好的 tutorial 并做了一些小改动,自己尝试了代码。所以,长话短说 - 它有效,但速度很荒谬(~150 kb/s),这在我的情况下(大文件)尤其有害。据我所知,paramiko 可能是一个瓶颈,但我对预取和不同缓冲区大小的实验并没有多大意义(我得到的最大结果是第一个块(预取打开)大约 1500 kb/s 然后对于 ~1.5 gb 文件上的其他块,下拉到 250-300 kb/s)。因此,由于我 运行 没有想法,因此非常感谢任何有关下一步尝试的提示和想法,在此先感谢!这是我的代码:
AWS 会话包装器
aws.py
import boto3
import os
def aws_session(region_name='us-east-1'):
AWS_ACCESS_KEY_ID = os.environ.get('AWS_ACCESS_KEY_ID')
AWS_ACCESS_KEY_SECRET = os.environ.get('AWS_ACCESS_KEY_SECRET')
return boto3.session.Session(aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_ACCESS_KEY_SECRET,
region_name=region_name)
主脚本
stream.py
import paramiko
import math
import time
from aws import aws_session
from dotenv import load_dotenv
def open_sftp_connection(sftp_host, sftp_port, sftp_username, sftp_password):
client = paramiko.SSHClient()
client.load_system_host_keys()
try:
transport = paramiko.Transport(sftp_host, sftp_port)
except Exception as e:
return 'conn_error'
try:
transport.connect(username=sftp_username, password=sftp_password)
except Exception as identifier:
return 'auth_error'
ftp_connection = paramiko.SFTPClient.from_transport(transport)
return ftp_connection
def transfer_chunk_from_sftp_to_s3(sftp_file, s3_connection, multipart_upload, bucket_name,
s3_file_path, part_number, chunk_size):
start_time = time.time()
chunk = sftp_file.read(int(chunk_size))
part = s3_connection.upload_part(
Bucket=bucket_name,
Key=s3_file_path,
PartNumber=part_number,
UploadId=multipart_upload["UploadId"],
Body=chunk)
end_time = time.time()
total_seconds = end_time - start_time
print(
"speed is {} kb/s total seconds taken {}".format(
math.ceil((int(chunk_size) / 1024) / total_seconds), total_seconds
)
)
part_output = {"PartNumber": part_number, "ETag": part["ETag"]}
return part_output
def transfer_file_from_sftp_to_s3(bucket_name, sftp_host, sftp_port, sftp_path,
aws_s3_path, sftp_username, sftp_password, chunk_size=26214400):
sftp_connection = open_sftp_connection(sftp_host, int(sftp_port), sftp_username, sftp_password)
sftp_file = sftp_connection.file(sftp_path, "r", bufsize=-1)
s3_connection = aws_session().client('s3')
sftp_file_size = sftp_file._get_size()
print('file size: ', sftp_file_size)
chunk_count = int(math.ceil(sftp_file_size / float(chunk_size)))
print('amount of chunks: ', chunk_count)
multipart_upload = s3_connection.create_multipart_upload(Bucket=bucket_name, Key=aws_s3_path)
parts = []
for i in range(chunk_count):
print("Transferring chunk {}...".format(i + 1))
part = transfer_chunk_from_sftp_to_s3(
sftp_file,
s3_connection,
multipart_upload,
bucket_name,
aws_s3_path,
i + 1,
chunk_size,
)
parts.append(part)
print("Chunk {} Transferred Successfully!".format(i + 1))
part_info = {"Parts": parts}
s3_connection.complete_multipart_upload(
Bucket=bucket_name,
Key=aws_s3_path,
UploadId=multipart_upload["UploadId"],
MultipartUpload=part_info,
)
sftp_file.close()
if __name__ == '__main__':
load_dotenv()
bucket_name=''
sftp_host=''
sftp_port=int('22')
sftp_path=''
aws_s3_path=''
sftp_username=''
sftp_password=''
transfer_file_from_sftp_to_s3(
bucket_name=bucket_name,
sftp_host=sftp_host,
sftp_port=22,
sftp_path=sftp_path,
aws_s3_path=aws_s3_path,
sftp_username=sftp_username,
sftp_password=sftp_password
)
PS:我使用此脚本的场景如下:我需要接受传入的大文件(来自任何地方)以通过管道处理它们,但处理管道是隔离的,因此无法从外部世界(所有涉及数据的通信都是通过 S3 存储完成的)。我采用的解决方案方案是中间的一台小型机器,它可以接受 sftp 凭据并将文件“流”(或逐块上传)文件到 S3 存储,管道将从那里捕获它并完成工作。
这里存在三个问题:
您将下载和上传块的时间计为一次传输,因此如果下载 100 秒,上传 1 mb 需要 10 秒,您报告 10kb/s(1mb/110 秒)而不是 11kb/s 后跟 103kb/s。我提到这个是因为它隐藏了第二个问题。
Paramiko 对某些人的持续传输存在一些问题。 This bug on paramiko 有更多详细信息。
而且,除了这些问题之外,您还传输了一大块,然后转身上传。在大多数环境中,这是一种浪费,即使运行良好,这通常意味着您的下载花费了 1/2 的时间等待上传。
您可以解决所有问题:
import paramiko
import boto3
import multiprocessing
import time
# Simple helper to track an activity and report the duration
# and how fast data was transferred
class SpeedTracker:
def __init__(self):
self._start = time.time()
self._end = None
def start(self):
self._start = time.time()
def end(self, bytes, desc):
self._end = time.time()
secs = self._end - self._start
if secs > 0:
print(f"{desc} done, {(bytes / 1048576) / secs:0.3f} MB/s")
class FastTransport(paramiko.Transport):
# Correct issues with window size, see paramiko issue 175
def __init__(self, sock):
super(FastTransport, self).__init__(sock)
self.window_size = 2147483647
self.packetizer.REKEY_BYTES = pow(2, 40)
self.packetizer.REKEY_PACKETS = pow(2, 40)
def open_sftp_connection(sftp_host, sftp_port, sftp_username, sftp_password, sftp_key):
client = paramiko.SSHClient()
client.load_system_host_keys()
transport = FastTransport((sftp_host, sftp_port))
# Not necessary, but here for testing purposes, support either
# password or private key auth
if sftp_password is not None:
transport.connect(username=sftp_username, password=sftp_password)
else:
pkey = paramiko.RSAKey.from_private_key_file(sftp_key)
transport.connect(username=sftp_username, pkey=pkey)
ftp_connection = paramiko.SFTPClient.from_transport(transport)
return ftp_connection
def pull_from_sftp(sftp_host, sftp_port, sftp_username, sftp_password, sftp_key, sftp_path, queue):
sftp_connection = open_sftp_connection(sftp_host, int(sftp_port), sftp_username, sftp_password, sftp_key)
sftp_file = sftp_connection.file(sftp_path, "rb")
# Enable pipelined mode, see paramiko issue 175
sftp_file.set_pipelined()
# Allow the transfer to fill up data in a background thread
sftp_file.prefetch()
chunk_size = 8388608
tracker = SpeedTracker()
num = 0
while True:
# Download one chunk
tracker.start()
chunk = sftp_file.read(chunk_size)
if len(chunk) == 0:
# All done, time to stop work
queue.put(None)
sftp_file.close()
break
# Send the chunk off to the reader process
num += 1
tracker.end(len(chunk), f"Downloaded chunk #{num}")
queue.put(chunk)
def send_chunk_to_s3(s3_connection, multipart_upload, bucket_name,
s3_file_path, part_number, chunk):
# Upload one chunk to S3
tracker = SpeedTracker()
part = s3_connection.upload_part(
Bucket=bucket_name,
Key=s3_file_path,
PartNumber=part_number,
UploadId=multipart_upload["UploadId"],
Body=chunk)
tracker.end(len(chunk), f"Uploaded chunk #{part_number}")
part_output = {"PartNumber": part_number, "ETag": part["ETag"]}
return part_output
def transfer_file_from_sftp_to_s3(bucket_name, sftp_host, sftp_port, sftp_path,
aws_s3_path, sftp_username, sftp_password, sftp_key):
# Start a worker process to get the data from SFTP
queue = multiprocessing.Queue(10)
proc = multiprocessing.Process(target=pull_from_sftp, args=(sftp_host, sftp_port, sftp_username, sftp_password, sftp_key, sftp_path, queue))
proc.start()
# And start reading from that worker to upload its results
s3_connection = boto3.client('s3')
multipart_upload = s3_connection.create_multipart_upload(Bucket=bucket_name, Key=aws_s3_path)
parts = []
while True:
chunk = queue.get()
if chunk is None:
break
part = send_chunk_to_s3(
s3_connection,
multipart_upload,
bucket_name,
aws_s3_path,
len(parts) + 1,
chunk,
)
parts.append(part)
# All done, clean up and finalize the multipart upload
proc.join()
part_info = {"Parts": parts}
resp = s3_connection.complete_multipart_upload(
Bucket=bucket_name,
Key=aws_s3_path,
UploadId=multipart_upload["UploadId"],
MultipartUpload=part_info,
)
print(resp)
if __name__ == '__main__':
sftp_host='TODO'
sftp_port=int('22')
sftp_path='TODO'
sftp_username='TODO'
sftp_password='TODO'
sftp_key=None
bucket_name='TODO'
aws_s3_path='TODO'
transfer_file_from_sftp_to_s3(
bucket_name=bucket_name,
sftp_host=sftp_host,
sftp_port=22,
sftp_path=sftp_path,
aws_s3_path=aws_s3_path,
sftp_username=sftp_username,
sftp_password=sftp_password,
sftp_key=sftp_key
)
在我的本地机器上测试,我看到下载和上传速度大约为 40 MB/s,如果没有错误修复,这肯定更好。