如何使用 Python 运行 AWS S3 同步命令同时针对不同的前缀

How to run AWS S3 sync command concurrently for different prefixes using Python

我正在尝试编写一个 python 脚本,它使用子进程模块将文件从一个 s3 存储桶复制到另一个存储桶。但是,为了提高性能,我正在尝试 运行 并行分离具有不同前缀的同步命令。

到目前为止,我已经尝试过脚本没有终止,我不确定子进程是否 运行 并发。

import subprocess

prefix = ['prefix1','prefix2','prefix3']
source_bucket = 's3://source'
dest_bucket = 's3://dest'
commands = []

for p in prefix:
   command = 'aws s3 sync source_bucket' + p + ' dest_bucket'
   commands.append(command)

procs = [subprocess.Popen(i, shell=True, stdout=subprocess.PIPE) for i in commands]

for p in procs:
   p.wait()

有更好的方法吗?感谢任何帮助。

因为您要传入 subprocess.PIPE,不同的进程将在等待输出时阻塞。您需要 运行 一个单独的进程来与每个 aws 实例进行通信。一种可能性是使用 Python 的多处理:

import subprocess
import multiprocessing

def worker(command, queue):
    # Don't use shell here, we can avoid the overhead
    proc = subprocess.Popen(command, stdout=subprocess.PIPE, bufsize=1, universal_newlines=True)
    # Read from the aws command output and send it off the queue as soon
    # as it's available
    for line in proc.stdout:
        queue.put(line.rstrip("\r\n"))
    
    # Notify the listener that we're done
    queue.put(None)


def main():
    # Move all work to a function so it's multiprocessing safe, see
    # https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming
    # Note: Adding trailing slash so "source_bucket" + "prefix" is a valid S3 URI
    prefixes = ['prefix1/', 'prefix2/', 'prefix3/']
    source_bucket = 's3://source/'
    dest_bucket = 's3://dest/'

    # Create a queue to gather final messages from each worker
    queue = multiprocessing.Queue()
    procs = []

    for p in prefixes:
        # Pass in --no-progress so the progress messages aren't shown
        # displaying those messages is complicated, and requires quite a bit
        # of work to make sure they don't interfer with each other
        # Correct the command syntax here to use all the variables
        # Need to pass in the prefix to the dest URI as well so the same structure is
        # maintained
        # Use a argv style call here so we can avoid bringing the shell into this
        command = ['aws', 's3', 'sync', '--no-progress', source_bucket + p, dest_bucket + p]
        # Hand off the work to a worker to read from the pipe to prevent each
        # spawned aws instance from blocking
        proc = multiprocessing.Process(target=worker, args=(command, queue))
        proc.start()
        procs.append(proc)

    # Read from the Queue to show the output
    left = len(procs)
    while left > 0:
        msg = queue.get()
        if msg is None:
            # This means a worker is done
            left -= 1
        else:
            # Just print out the output, doing it in one process to prevent any
            # collision possibilities
            print(msg)
    
    # Clean up
    for proc in procs:
        proc.join()

if __name__ == "__main__":
    main()