运行 使用 python 在队列中处理一个又一个
Run process after process in a queue using python
我有一个包含 500 个进程的队列,我想通过 python 脚本 运行,我想 运行 每 N 个进程并行。
我的 python 脚本目前做了什么:
它 运行s N 个并行进程,等待所有进程终止,然后 运行s 下 N 个文件。
我需要做的:
当 N 个进程中的一个完成时,队列中的另一个进程将自动启动,而无需等待其余进程终止。
注意:我不知道每个过程需要多少时间,所以我无法在特定时间安排一个过程到 运行。
以下是我的代码。
我目前正在使用subprocess.Popen,但我并不局限于它的使用。
for i in range(0, len(queue), N):
batch = []
for _ in range(int(jobs)):
batch.append(queue.pop(0))
for process in batch:
p = subprocess.Popen([process])
ps.append(p)
for p in ps:
p.communicate()
我相信这应该有效:
import subprocess
import time
def check_for_done(l):
for i, p in enumerate(l):
if p.poll() is not None:
return True, i
return False, False
processes = list()
N = 5
queue = list()
for process in queue:
p = subprocess.Popen(process)
processes.append(p)
if len(processes) == N:
wait = True
while wait:
done, num = check_for_done(processes)
if done:
processes.pop(num)
wait = False
else:
time.sleep(0.5) # set this so the CPU does not go crazy
所以你有一个活动进程列表,check_for_done 函数循环遍历它,子进程 returns None 如果它没有完成并且它 return如果是,则为 return 代码。所以当某事被 returned 时,它应该被完成(不知道它是否成功)。然后从列表中删除该进程,允许循环添加另一个进程。
假设 python3,您可以使用 concurrent.futures
中的 ThreadPoolExecutor
,例如
$ cat run.py
from subprocess import Popen, PIPE
from concurrent.futures import ThreadPoolExecutor
def exec_(cmd):
proc = Popen(cmd, stdout=PIPE, stderr=PIPE)
stdout, stderr = proc.communicate()
#print(stdout, stderr)
def main():
with ThreadPoolExecutor(max_workers=4) as executor:
# to demonstrate it will take a batch of 4 jobs at the same time
cmds = [['sleep', '4'] for i in range(10)]
start = time.time()
futures = executor.map(exec_, cmds)
for future in futures:
pass
end = time.time()
print(f'Took {end-start} seconds')
if __name__ == '__main__':
main()
这将一次处理 4 个任务,由于任务数为 10,因此应该只需要大约 4 + 4 + 4 = 12 秒
前 4 秒 前 4 个任务
秒 4 秒 秒 4 任务
最后 4 秒 剩余 2 项任务
输出:
$ python run.py
Took 12.005989074707031 seconds
我有一个包含 500 个进程的队列,我想通过 python 脚本 运行,我想 运行 每 N 个进程并行。
我的 python 脚本目前做了什么: 它 运行s N 个并行进程,等待所有进程终止,然后 运行s 下 N 个文件。
我需要做的: 当 N 个进程中的一个完成时,队列中的另一个进程将自动启动,而无需等待其余进程终止。
注意:我不知道每个过程需要多少时间,所以我无法在特定时间安排一个过程到 运行。
以下是我的代码。 我目前正在使用subprocess.Popen,但我并不局限于它的使用。
for i in range(0, len(queue), N):
batch = []
for _ in range(int(jobs)):
batch.append(queue.pop(0))
for process in batch:
p = subprocess.Popen([process])
ps.append(p)
for p in ps:
p.communicate()
我相信这应该有效:
import subprocess
import time
def check_for_done(l):
for i, p in enumerate(l):
if p.poll() is not None:
return True, i
return False, False
processes = list()
N = 5
queue = list()
for process in queue:
p = subprocess.Popen(process)
processes.append(p)
if len(processes) == N:
wait = True
while wait:
done, num = check_for_done(processes)
if done:
processes.pop(num)
wait = False
else:
time.sleep(0.5) # set this so the CPU does not go crazy
所以你有一个活动进程列表,check_for_done 函数循环遍历它,子进程 returns None 如果它没有完成并且它 return如果是,则为 return 代码。所以当某事被 returned 时,它应该被完成(不知道它是否成功)。然后从列表中删除该进程,允许循环添加另一个进程。
假设 python3,您可以使用 concurrent.futures
中的 ThreadPoolExecutor
,例如
$ cat run.py
from subprocess import Popen, PIPE
from concurrent.futures import ThreadPoolExecutor
def exec_(cmd):
proc = Popen(cmd, stdout=PIPE, stderr=PIPE)
stdout, stderr = proc.communicate()
#print(stdout, stderr)
def main():
with ThreadPoolExecutor(max_workers=4) as executor:
# to demonstrate it will take a batch of 4 jobs at the same time
cmds = [['sleep', '4'] for i in range(10)]
start = time.time()
futures = executor.map(exec_, cmds)
for future in futures:
pass
end = time.time()
print(f'Took {end-start} seconds')
if __name__ == '__main__':
main()
这将一次处理 4 个任务,由于任务数为 10,因此应该只需要大约 4 + 4 + 4 = 12 秒
前 4 秒 前 4 个任务
秒 4 秒 秒 4 任务
最后 4 秒 剩余 2 项任务
输出:
$ python run.py
Took 12.005989074707031 seconds