SFTP 到 S3 AWS Lambda 使用 Python Paramiko 非常慢
SFTP to S3 AWS Lambda using Python Paramiko is extremely slow
我正在使用 Paramiko 库将数据从 SFTP 流式传输到 AWS 中具有 python 3.8 lambda 的 S3 存储桶。该脚本是标准的,下载小于 6MB 的整个文件,对于较大的文件,使用大约 6MB 的块大小进行多块上传。但是,但我注意到速度非常慢,大约 47MB 文件的速度约为 200KB/秒,而基于在线的速度至少应为 2MB/秒(这仍然被认为相当慢)。不会有很多超过 1GB 的文件,但是对于 200MB 的文件,将达到 15 分钟的 lambda 超时。我正在使用 VPC,但我不熟悉配置,我们有一个团队可以处理它。这么慢是有原因的吗?
脚本:
def open_ftp_connection(self):
"""
Opens ftp connection and returns connection object
"""
client = paramiko.SSHClient()
client.load_system_host_keys()
try:
transport = paramiko.Transport(self.host_name,22)
#self.trans = paramiko.Transport((hostname, port))
transport.window_size = 134217727
transport.use_compression()
print("A")
except Exception as e:
return 'conn_error'
try:
transport.connect(username=self.ftp_username, password=self.ftp_password)
print("B")
#transport.exec_command("ls")
print("C")
except Exception as identifier:
return 'auth_error'
ftp_connection = paramiko.SFTPClient.from_transport(transport)
print("D")
return ftp_connection
def transfer_chunk_from_ftp_to_s3(self,ftp_file,s3_connection,multipart_upload,bucket_name,ftp_file_path,s3_file_path, part_number,chunk_size):
start_time = time.time()
chunk = ftp_file.read(int(chunk_size))
part = s3_connection.upload_part(Bucket=bucket_name,
Key=s3_file_path, PartNumber=part_number, UploadId=multipart_upload["UploadId"],Body=chunk)
end_time = time.time()
total_seconds = end_time - start_time
print(
"speed is {} kb/s total seconds taken {}".format(
math.ceil((int(chunk_size) / 1024) / total_seconds), total_seconds
)
)
part_output = {"PartNumber": part_number, "ETag": part["ETag"]}
return part_output
def transfer_file_from_ftp_to_s3(self,
bucket_name, ftp_file_path, s3_file_path, ftp_username, ftp_password, chunk_size,ftp_connection):
# ftp_connection = self.open_ftp_connection(
# FTP_HOST, ftp_username, ftp_password
# )
ftp_file = ftp_connection.file(ftp_file_path, "r")
s3_connection = boto3.client("s3")
ftp_file_size = ftp_file._get_size()
try:
s3_file = s3_connection.head_object(Bucket=bucket_name, Key=s3_file_path)
if s3_file["ContentLength"] == ftp_file_size:
print("File Already Exists in S3 bucket")
ftp_file.close()
return
except Exception as e:
pass
logger.info("file size: "+str(ftp_file_size))
logger.info("chunk size: "+str(chunk_size))
if ftp_file_size <= int(chunk_size):
# upload file in one go
print("Transferring complete File from FTP to S3...")
ftp_file_data = ftp_file.read()
ftp_file_data_bytes = io.BytesIO(ftp_file_data)
s3_connection.upload_fileobj(ftp_file_data_bytes, bucket_name, s3_file_path)
print("Successfully Transferred file from FTP to S3!")
ftp_file.close()
else:
print("Transferring File from FTP to S3 in chunks...")
# upload file in chunks
chunk_count = int(math.ceil(ftp_file_size / float(chunk_size)))
multipart_upload = s3_connection.create_multipart_upload(
Bucket=bucket_name, Key=s3_file_path
)
logger.info("chunk count: "+str(chunk_count))
parts = []
for i in range(chunk_count):
print("Transferring chunk {}...".format(i + 1))
part = self.transfer_chunk_from_ftp_to_s3(
ftp_file,
s3_connection,
multipart_upload,
bucket_name,
ftp_file_path,
s3_file_path,
i + 1,
chunk_size
)
parts.append(part)
print("Chunk {} Transferred Successfully!".format(i + 1))
part_info = {"Parts": parts}
s3_connection.complete_multipart_upload(
Bucket=bucket_name,
Key=s3_file_path,
UploadId=multipart_upload["UploadId"],
MultipartUpload=part_info,
)
print("All chunks Transferred to S3 bucket! File Transfer successful!")
ftp_file.close()
上面的代码都是这样调用的:
self.transfer_file_from_ftp_to_s3(self.s3_bucket,self.ftp_full_file_path, s3_file_path, self.ftp_username, self.ftp_password, CHUNK_SIZE,ftp_connection)
这是输出:
speed is 222 kb/s total seconds taken 27.759796857833862
Chunk 1 Transferred Successfully!
Transferring chunk 2...
speed is 214 kb/s total seconds taken 28.721262216567993
Chunk 2 Transferred Successfully!
Transferring chunk 3...
speed is 193 kb/s total seconds taken 31.968283653259277
Chunk 3 Transferred Successfully!
Transferring chunk 4...
speed is 196 kb/s total seconds taken 31.360466480255127
Chunk 4 Transferred Successfully!
Transferring chunk 5...
speed is 216 kb/s total seconds taken 28.545111417770386
Chunk 5 Transferred Successfully!
Transferring chunk 6...
speed is 218 kb/s total seconds taken 28.293278217315674
Chunk 6 Transferred Successfully!
Transferring chunk 7...
speed is 217 kb/s total seconds taken 28.43106746673584
Chunk 7 Transferred Successfully!
Transferring chunk 8...
speed is 200 kb/s total seconds taken 30.775285482406616
Chunk 8 Transferred Successfully!
All chunks Transferred to S3 bucket! File Transfer successful!
编辑:
在 transfer_file_to_S3 函数中添加 ftp_file.prefetch() 似乎显着提高了 ~47MB 文件的速度,从每秒 202KB 增加到 2MB。但是,对于使用相同块大小的 1GB 文件,它开始使用每秒 2MB,但到块 10,速度下降回每秒 202KB。
我的问题解决方案是使用 paramiko readv(),它读取块列表并节省时间,因为它不使用搜索。我还用上面的方法添加了多线程来一次下载几个块,然后使用分段上传。单独执行 readv 可将其速度提高到每秒 2-3MB,更高的速度达到每秒 10MB,并且多个线程提供相同的速度,但同时处理文件的不同部分。这允许在不到 6 分钟的时间内读取 1GB 的文件,而原始文件只允许在 15 分钟的时间范围内读取 200MB。我还将添加预取,并且未使用评论中提到的其他修复,因为 readv 自己使用预取,而预取对大文件没有帮助
我正在使用 Paramiko 库将数据从 SFTP 流式传输到 AWS 中具有 python 3.8 lambda 的 S3 存储桶。该脚本是标准的,下载小于 6MB 的整个文件,对于较大的文件,使用大约 6MB 的块大小进行多块上传。但是,但我注意到速度非常慢,大约 47MB 文件的速度约为 200KB/秒,而基于在线的速度至少应为 2MB/秒(这仍然被认为相当慢)。不会有很多超过 1GB 的文件,但是对于 200MB 的文件,将达到 15 分钟的 lambda 超时。我正在使用 VPC,但我不熟悉配置,我们有一个团队可以处理它。这么慢是有原因的吗?
脚本:
def open_ftp_connection(self):
"""
Opens ftp connection and returns connection object
"""
client = paramiko.SSHClient()
client.load_system_host_keys()
try:
transport = paramiko.Transport(self.host_name,22)
#self.trans = paramiko.Transport((hostname, port))
transport.window_size = 134217727
transport.use_compression()
print("A")
except Exception as e:
return 'conn_error'
try:
transport.connect(username=self.ftp_username, password=self.ftp_password)
print("B")
#transport.exec_command("ls")
print("C")
except Exception as identifier:
return 'auth_error'
ftp_connection = paramiko.SFTPClient.from_transport(transport)
print("D")
return ftp_connection
def transfer_chunk_from_ftp_to_s3(self,ftp_file,s3_connection,multipart_upload,bucket_name,ftp_file_path,s3_file_path, part_number,chunk_size):
start_time = time.time()
chunk = ftp_file.read(int(chunk_size))
part = s3_connection.upload_part(Bucket=bucket_name,
Key=s3_file_path, PartNumber=part_number, UploadId=multipart_upload["UploadId"],Body=chunk)
end_time = time.time()
total_seconds = end_time - start_time
print(
"speed is {} kb/s total seconds taken {}".format(
math.ceil((int(chunk_size) / 1024) / total_seconds), total_seconds
)
)
part_output = {"PartNumber": part_number, "ETag": part["ETag"]}
return part_output
def transfer_file_from_ftp_to_s3(self,
bucket_name, ftp_file_path, s3_file_path, ftp_username, ftp_password, chunk_size,ftp_connection):
# ftp_connection = self.open_ftp_connection(
# FTP_HOST, ftp_username, ftp_password
# )
ftp_file = ftp_connection.file(ftp_file_path, "r")
s3_connection = boto3.client("s3")
ftp_file_size = ftp_file._get_size()
try:
s3_file = s3_connection.head_object(Bucket=bucket_name, Key=s3_file_path)
if s3_file["ContentLength"] == ftp_file_size:
print("File Already Exists in S3 bucket")
ftp_file.close()
return
except Exception as e:
pass
logger.info("file size: "+str(ftp_file_size))
logger.info("chunk size: "+str(chunk_size))
if ftp_file_size <= int(chunk_size):
# upload file in one go
print("Transferring complete File from FTP to S3...")
ftp_file_data = ftp_file.read()
ftp_file_data_bytes = io.BytesIO(ftp_file_data)
s3_connection.upload_fileobj(ftp_file_data_bytes, bucket_name, s3_file_path)
print("Successfully Transferred file from FTP to S3!")
ftp_file.close()
else:
print("Transferring File from FTP to S3 in chunks...")
# upload file in chunks
chunk_count = int(math.ceil(ftp_file_size / float(chunk_size)))
multipart_upload = s3_connection.create_multipart_upload(
Bucket=bucket_name, Key=s3_file_path
)
logger.info("chunk count: "+str(chunk_count))
parts = []
for i in range(chunk_count):
print("Transferring chunk {}...".format(i + 1))
part = self.transfer_chunk_from_ftp_to_s3(
ftp_file,
s3_connection,
multipart_upload,
bucket_name,
ftp_file_path,
s3_file_path,
i + 1,
chunk_size
)
parts.append(part)
print("Chunk {} Transferred Successfully!".format(i + 1))
part_info = {"Parts": parts}
s3_connection.complete_multipart_upload(
Bucket=bucket_name,
Key=s3_file_path,
UploadId=multipart_upload["UploadId"],
MultipartUpload=part_info,
)
print("All chunks Transferred to S3 bucket! File Transfer successful!")
ftp_file.close()
上面的代码都是这样调用的:
self.transfer_file_from_ftp_to_s3(self.s3_bucket,self.ftp_full_file_path, s3_file_path, self.ftp_username, self.ftp_password, CHUNK_SIZE,ftp_connection)
这是输出:
speed is 222 kb/s total seconds taken 27.759796857833862
Chunk 1 Transferred Successfully!
Transferring chunk 2...
speed is 214 kb/s total seconds taken 28.721262216567993
Chunk 2 Transferred Successfully!
Transferring chunk 3...
speed is 193 kb/s total seconds taken 31.968283653259277
Chunk 3 Transferred Successfully!
Transferring chunk 4...
speed is 196 kb/s total seconds taken 31.360466480255127
Chunk 4 Transferred Successfully!
Transferring chunk 5...
speed is 216 kb/s total seconds taken 28.545111417770386
Chunk 5 Transferred Successfully!
Transferring chunk 6...
speed is 218 kb/s total seconds taken 28.293278217315674
Chunk 6 Transferred Successfully!
Transferring chunk 7...
speed is 217 kb/s total seconds taken 28.43106746673584
Chunk 7 Transferred Successfully!
Transferring chunk 8...
speed is 200 kb/s total seconds taken 30.775285482406616
Chunk 8 Transferred Successfully!
All chunks Transferred to S3 bucket! File Transfer successful!
编辑:
在 transfer_file_to_S3 函数中添加 ftp_file.prefetch() 似乎显着提高了 ~47MB 文件的速度,从每秒 202KB 增加到 2MB。但是,对于使用相同块大小的 1GB 文件,它开始使用每秒 2MB,但到块 10,速度下降回每秒 202KB。
我的问题解决方案是使用 paramiko readv(),它读取块列表并节省时间,因为它不使用搜索。我还用上面的方法添加了多线程来一次下载几个块,然后使用分段上传。单独执行 readv 可将其速度提高到每秒 2-3MB,更高的速度达到每秒 10MB,并且多个线程提供相同的速度,但同时处理文件的不同部分。这允许在不到 6 分钟的时间内读取 1GB 的文件,而原始文件只允许在 15 分钟的时间范围内读取 200MB。我还将添加预取,并且未使用评论中提到的其他修复,因为 readv 自己使用预取,而预取对大文件没有帮助