递归地将文件从 SFTP 移动到 S3 保留结构

Recursively move files from SFTP to S3 preserving structure

我正在尝试递归地将文件从 SFTP 服务器移动到 S3,可能使用 boto3。我也想保留 folder/file 结构。我想这样做:

import pysftp

private_key = "/mnt/results/sftpkey"

srv = pysftp.Connection(host="server.com", username="user1", private_key=private_key)

srv.get_r("/mnt/folder", "./output_folder")

然后获取这些文件并使用 boto3 将它们上传到 S3。但是,服务器上的文件夹和文件数量多、层次深、体积大。所以我的机器最终 运行 内存和磁盘不足 space。我在想一个脚本,我可以在其中下载单个文件并上传单个文件,然后删除并重复。

我知道这需要很长时间才能完成,但我可以 运行 将其作为一项工作,而无需 运行 退出 space 并且不让我的机器整个打开时间。有没有人做过类似的事情?感谢您的帮助!

你必须这样做file-by-file。

从这里的递归下载代码开始:

在每个 sftp.get 之后,执行 S3 上传并删除文件。

实际上,您甚至可以将文件从 SFTP 复制到 S#,而无需将文件存储在本地:

如果您不能(或不想)在将文件发送到 S3 之前一次下载所有文件,那么您需要一次下载一个。

此外,从那里开始,您需要构建要下载的文件列表,然后处理它们,将一个文件传输到本地计算机,然后将其发送到 S3。

一个非常简单的版本看起来像这样:

import pysftp
import stat
import boto3
import os
import json

# S3 bucket and prefix to upload to
target_bucket = "example-bucket"
target_prefix = ""
# Root FTP folder to sync
base_path = "./"
# Both base_path and target_prefix should end in a "/"
# Or, for the prefix, be empty for the root of the bucket
srv = pysftp.Connection(
    host="server.com", 
    username="user1", 
    private_key="/mnt/results/sftpkey",
)

if os.path.isfile("all_files.json"):
    # No need to cache files more than once. This lets us restart 
    # on a failure, though really we should be caching files in 
    # something more robust than just a json file
    with open("all_files.json") as f:
        all_files = json.load(f)
else:
    # No local cache, go ahead and get the files
    print("Need to get list of files...")
    todo = [(base_path, target_prefix)]
    all_files = []

    while len(todo):
        cur_dir, cur_prefix = todo.pop(0)
        print("Listing " + cur_dir)
        for cur in srv.listdir_attr(cur_dir):
            if stat.S_ISDIR(cur.st_mode):
                # A directory, so walk into it
                todo.append((cur_dir + cur.filename + "/", cur_prefix + cur.filename + "/"))
            else:
                # A file, just add it to our cache
                all_files.append([cur_dir + cur.filename, cur_prefix + cur.filename])

    # Save the cache out to disk    
    with open("all_files.json", "w") as f:
        json.dump(all_files, f)

# And now, for every file in the cache, download it
# and turn around and upload it to S3
s3 = boto3.client('s3')
while len(all_files):
    ftp_file, s3_name = all_files.pop(0)

    print("Downloading " + ftp_file)
    srv.get(ftp_file, "_temp_")
    print("Uploading " + s3_name)
    s3.upload_file("_temp_", target_bucket, s3_name)

    # Clean up, and update the cache with one less file
    os.unlink("_temp_")
    with open("all_files.json", "w") as f:
        json.dump(all_files, f)

srv.close()

错误检查和速度改进显然是可能的。