如何使用 Python 运行 AWS S3 同步命令同时针对不同的前缀
How to run AWS S3 sync command concurrently for different prefixes using Python
我正在尝试编写一个 python 脚本,它使用子进程模块将文件从一个 s3 存储桶复制到另一个存储桶。但是,为了提高性能,我正在尝试 运行 并行分离具有不同前缀的同步命令。
到目前为止,我已经尝试过脚本没有终止,我不确定子进程是否 运行 并发。
import subprocess
prefix = ['prefix1','prefix2','prefix3']
source_bucket = 's3://source'
dest_bucket = 's3://dest'
commands = []
for p in prefix:
command = 'aws s3 sync source_bucket' + p + ' dest_bucket'
commands.append(command)
procs = [subprocess.Popen(i, shell=True, stdout=subprocess.PIPE) for i in commands]
for p in procs:
p.wait()
有更好的方法吗?感谢任何帮助。
因为您要传入 subprocess.PIPE
,不同的进程将在等待输出时阻塞。您需要 运行 一个单独的进程来与每个 aws 实例进行通信。一种可能性是使用 Python 的多处理:
import subprocess
import multiprocessing
def worker(command, queue):
# Don't use shell here, we can avoid the overhead
proc = subprocess.Popen(command, stdout=subprocess.PIPE, bufsize=1, universal_newlines=True)
# Read from the aws command output and send it off the queue as soon
# as it's available
for line in proc.stdout:
queue.put(line.rstrip("\r\n"))
# Notify the listener that we're done
queue.put(None)
def main():
# Move all work to a function so it's multiprocessing safe, see
# https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming
# Note: Adding trailing slash so "source_bucket" + "prefix" is a valid S3 URI
prefixes = ['prefix1/', 'prefix2/', 'prefix3/']
source_bucket = 's3://source/'
dest_bucket = 's3://dest/'
# Create a queue to gather final messages from each worker
queue = multiprocessing.Queue()
procs = []
for p in prefixes:
# Pass in --no-progress so the progress messages aren't shown
# displaying those messages is complicated, and requires quite a bit
# of work to make sure they don't interfer with each other
# Correct the command syntax here to use all the variables
# Need to pass in the prefix to the dest URI as well so the same structure is
# maintained
# Use a argv style call here so we can avoid bringing the shell into this
command = ['aws', 's3', 'sync', '--no-progress', source_bucket + p, dest_bucket + p]
# Hand off the work to a worker to read from the pipe to prevent each
# spawned aws instance from blocking
proc = multiprocessing.Process(target=worker, args=(command, queue))
proc.start()
procs.append(proc)
# Read from the Queue to show the output
left = len(procs)
while left > 0:
msg = queue.get()
if msg is None:
# This means a worker is done
left -= 1
else:
# Just print out the output, doing it in one process to prevent any
# collision possibilities
print(msg)
# Clean up
for proc in procs:
proc.join()
if __name__ == "__main__":
main()
我正在尝试编写一个 python 脚本,它使用子进程模块将文件从一个 s3 存储桶复制到另一个存储桶。但是,为了提高性能,我正在尝试 运行 并行分离具有不同前缀的同步命令。
到目前为止,我已经尝试过脚本没有终止,我不确定子进程是否 运行 并发。
import subprocess
prefix = ['prefix1','prefix2','prefix3']
source_bucket = 's3://source'
dest_bucket = 's3://dest'
commands = []
for p in prefix:
command = 'aws s3 sync source_bucket' + p + ' dest_bucket'
commands.append(command)
procs = [subprocess.Popen(i, shell=True, stdout=subprocess.PIPE) for i in commands]
for p in procs:
p.wait()
有更好的方法吗?感谢任何帮助。
因为您要传入 subprocess.PIPE
,不同的进程将在等待输出时阻塞。您需要 运行 一个单独的进程来与每个 aws 实例进行通信。一种可能性是使用 Python 的多处理:
import subprocess
import multiprocessing
def worker(command, queue):
# Don't use shell here, we can avoid the overhead
proc = subprocess.Popen(command, stdout=subprocess.PIPE, bufsize=1, universal_newlines=True)
# Read from the aws command output and send it off the queue as soon
# as it's available
for line in proc.stdout:
queue.put(line.rstrip("\r\n"))
# Notify the listener that we're done
queue.put(None)
def main():
# Move all work to a function so it's multiprocessing safe, see
# https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming
# Note: Adding trailing slash so "source_bucket" + "prefix" is a valid S3 URI
prefixes = ['prefix1/', 'prefix2/', 'prefix3/']
source_bucket = 's3://source/'
dest_bucket = 's3://dest/'
# Create a queue to gather final messages from each worker
queue = multiprocessing.Queue()
procs = []
for p in prefixes:
# Pass in --no-progress so the progress messages aren't shown
# displaying those messages is complicated, and requires quite a bit
# of work to make sure they don't interfer with each other
# Correct the command syntax here to use all the variables
# Need to pass in the prefix to the dest URI as well so the same structure is
# maintained
# Use a argv style call here so we can avoid bringing the shell into this
command = ['aws', 's3', 'sync', '--no-progress', source_bucket + p, dest_bucket + p]
# Hand off the work to a worker to read from the pipe to prevent each
# spawned aws instance from blocking
proc = multiprocessing.Process(target=worker, args=(command, queue))
proc.start()
procs.append(proc)
# Read from the Queue to show the output
left = len(procs)
while left > 0:
msg = queue.get()
if msg is None:
# This means a worker is done
left -= 1
else:
# Just print out the output, doing it in one process to prevent any
# collision possibilities
print(msg)
# Clean up
for proc in procs:
proc.join()
if __name__ == "__main__":
main()