递归地将文件从 SFTP 移动到 S3 保留结构
Recursively move files from SFTP to S3 preserving structure
我正在尝试递归地将文件从 SFTP 服务器移动到 S3,可能使用 boto3
。我也想保留 folder/file 结构。我想这样做:
import pysftp
private_key = "/mnt/results/sftpkey"
srv = pysftp.Connection(host="server.com", username="user1", private_key=private_key)
srv.get_r("/mnt/folder", "./output_folder")
然后获取这些文件并使用 boto3
将它们上传到 S3。但是,服务器上的文件夹和文件数量多、层次深、体积大。所以我的机器最终 运行 内存和磁盘不足 space。我在想一个脚本,我可以在其中下载单个文件并上传单个文件,然后删除并重复。
我知道这需要很长时间才能完成,但我可以 运行 将其作为一项工作,而无需 运行 退出 space 并且不让我的机器整个打开时间。有没有人做过类似的事情?感谢您的帮助!
你必须这样做file-by-file。
从这里的递归下载代码开始:
在每个 sftp.get
之后,执行 S3 上传并删除文件。
实际上,您甚至可以将文件从 SFTP 复制到 S#,而无需将文件存储在本地:
如果您不能(或不想)在将文件发送到 S3 之前一次下载所有文件,那么您需要一次下载一个。
此外,从那里开始,您需要构建要下载的文件列表,然后处理它们,将一个文件传输到本地计算机,然后将其发送到 S3。
一个非常简单的版本看起来像这样:
import pysftp
import stat
import boto3
import os
import json
# S3 bucket and prefix to upload to
target_bucket = "example-bucket"
target_prefix = ""
# Root FTP folder to sync
base_path = "./"
# Both base_path and target_prefix should end in a "/"
# Or, for the prefix, be empty for the root of the bucket
srv = pysftp.Connection(
host="server.com",
username="user1",
private_key="/mnt/results/sftpkey",
)
if os.path.isfile("all_files.json"):
# No need to cache files more than once. This lets us restart
# on a failure, though really we should be caching files in
# something more robust than just a json file
with open("all_files.json") as f:
all_files = json.load(f)
else:
# No local cache, go ahead and get the files
print("Need to get list of files...")
todo = [(base_path, target_prefix)]
all_files = []
while len(todo):
cur_dir, cur_prefix = todo.pop(0)
print("Listing " + cur_dir)
for cur in srv.listdir_attr(cur_dir):
if stat.S_ISDIR(cur.st_mode):
# A directory, so walk into it
todo.append((cur_dir + cur.filename + "/", cur_prefix + cur.filename + "/"))
else:
# A file, just add it to our cache
all_files.append([cur_dir + cur.filename, cur_prefix + cur.filename])
# Save the cache out to disk
with open("all_files.json", "w") as f:
json.dump(all_files, f)
# And now, for every file in the cache, download it
# and turn around and upload it to S3
s3 = boto3.client('s3')
while len(all_files):
ftp_file, s3_name = all_files.pop(0)
print("Downloading " + ftp_file)
srv.get(ftp_file, "_temp_")
print("Uploading " + s3_name)
s3.upload_file("_temp_", target_bucket, s3_name)
# Clean up, and update the cache with one less file
os.unlink("_temp_")
with open("all_files.json", "w") as f:
json.dump(all_files, f)
srv.close()
错误检查和速度改进显然是可能的。
我正在尝试递归地将文件从 SFTP 服务器移动到 S3,可能使用 boto3
。我也想保留 folder/file 结构。我想这样做:
import pysftp
private_key = "/mnt/results/sftpkey"
srv = pysftp.Connection(host="server.com", username="user1", private_key=private_key)
srv.get_r("/mnt/folder", "./output_folder")
然后获取这些文件并使用 boto3
将它们上传到 S3。但是,服务器上的文件夹和文件数量多、层次深、体积大。所以我的机器最终 运行 内存和磁盘不足 space。我在想一个脚本,我可以在其中下载单个文件并上传单个文件,然后删除并重复。
我知道这需要很长时间才能完成,但我可以 运行 将其作为一项工作,而无需 运行 退出 space 并且不让我的机器整个打开时间。有没有人做过类似的事情?感谢您的帮助!
你必须这样做file-by-file。
从这里的递归下载代码开始:
在每个 sftp.get
之后,执行 S3 上传并删除文件。
实际上,您甚至可以将文件从 SFTP 复制到 S#,而无需将文件存储在本地:
如果您不能(或不想)在将文件发送到 S3 之前一次下载所有文件,那么您需要一次下载一个。
此外,从那里开始,您需要构建要下载的文件列表,然后处理它们,将一个文件传输到本地计算机,然后将其发送到 S3。
一个非常简单的版本看起来像这样:
import pysftp
import stat
import boto3
import os
import json
# S3 bucket and prefix to upload to
target_bucket = "example-bucket"
target_prefix = ""
# Root FTP folder to sync
base_path = "./"
# Both base_path and target_prefix should end in a "/"
# Or, for the prefix, be empty for the root of the bucket
srv = pysftp.Connection(
host="server.com",
username="user1",
private_key="/mnt/results/sftpkey",
)
if os.path.isfile("all_files.json"):
# No need to cache files more than once. This lets us restart
# on a failure, though really we should be caching files in
# something more robust than just a json file
with open("all_files.json") as f:
all_files = json.load(f)
else:
# No local cache, go ahead and get the files
print("Need to get list of files...")
todo = [(base_path, target_prefix)]
all_files = []
while len(todo):
cur_dir, cur_prefix = todo.pop(0)
print("Listing " + cur_dir)
for cur in srv.listdir_attr(cur_dir):
if stat.S_ISDIR(cur.st_mode):
# A directory, so walk into it
todo.append((cur_dir + cur.filename + "/", cur_prefix + cur.filename + "/"))
else:
# A file, just add it to our cache
all_files.append([cur_dir + cur.filename, cur_prefix + cur.filename])
# Save the cache out to disk
with open("all_files.json", "w") as f:
json.dump(all_files, f)
# And now, for every file in the cache, download it
# and turn around and upload it to S3
s3 = boto3.client('s3')
while len(all_files):
ftp_file, s3_name = all_files.pop(0)
print("Downloading " + ftp_file)
srv.get(ftp_file, "_temp_")
print("Uploading " + s3_name)
s3.upload_file("_temp_", target_bucket, s3_name)
# Clean up, and update the cache with one less file
os.unlink("_temp_")
with open("all_files.json", "w") as f:
json.dump(all_files, f)
srv.close()
错误检查和速度改进显然是可能的。