python multiprocessing - select-like on 运行 processes to see which have one have finished
python multiprocessing - select-like on running processes to see which have one have finished
我想 运行 15 个命令,但一次只能 运行 3 个
testme.py
import multiprocessing
import time
import random
import subprocess
def popen_wrapper(i):
p = subprocess.Popen( ['echo', 'hi'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
print stdout
time.sleep(randomint(5,20)) #pretend it's doing some work
return p.returncode
num_to_run = 15
max_parallel = 3
running = []
for i in range(num_to_run):
p = multiprocessing.Process(target=popen_wrapper, args=(i,))
running.append(p)
p.start()
if len(running) >= max_parallel:
# blocking wait - join on whoever finishes first then continue
else:
# nonblocking wait- see if any processes is finished. If so, join the finished processes
我不确定如何实施评论:
if len(running) >= max_parallel:
# blocking wait - join on whoever finishes first then continue
else:
# nonblocking wait- see if any processes is finished. If so, join the finished processes
我不能做类似的事情:
for p in running:
p.join()
因为 运行ning 中的第二个进程本可以完成,但我仍然阻塞在第一个进程上。
问题:如何检查running
中的进程是否在阻塞和非阻塞中都完成了(找到第一个完成的)?
寻找类似于 waitpid 的东西,也许
也许最简单的安排方法是使用 multiprocessing.Pool:
pool = mp.Pool(3)
将设置一个包含 3 个工作进程的池。然后你可以发送15个任务到池中:
for i in range(num_to_run):
pool.apply_async(popen_wrapper, args=(i,), callback=log_result)
协调 3 名工人和 15 项任务所需的所有机器是
由 mp.Pool
.
照顾
使用mp.Pool:
import multiprocessing as mp
import time
import random
import subprocess
import logging
logger = mp.log_to_stderr(logging.WARN)
def popen_wrapper(i):
logger.warn('echo "hi"')
return i
def log_result(retval):
results.append(retval)
if __name__ == '__main__':
num_to_run = 15
max_parallel = 3
results = []
pool = mp.Pool(max_parallel)
for i in range(num_to_run):
pool.apply_async(popen_wrapper, args=(i,), callback=log_result)
pool.close()
pool.join()
logger.warn(results)
产量
[WARNING/PoolWorker-1] echo "hi"
[WARNING/PoolWorker-3] echo "hi"
[WARNING/PoolWorker-1] echo "hi"
[WARNING/PoolWorker-1] echo "hi"
[WARNING/PoolWorker-3] echo "hi"
[WARNING/PoolWorker-1] echo "hi"
[WARNING/PoolWorker-3] echo "hi"
[WARNING/PoolWorker-1] echo "hi"
[WARNING/PoolWorker-3] echo "hi"
[WARNING/PoolWorker-1] echo "hi"
[WARNING/PoolWorker-3] echo "hi"
[WARNING/PoolWorker-1] echo "hi"
[WARNING/PoolWorker-1] echo "hi"
[WARNING/PoolWorker-3] echo "hi"
[WARNING/PoolWorker-2] echo "hi"
[WARNING/MainProcess] [0, 2, 3, 5, 4, 6, 7, 8, 9, 10, 11, 12, 14, 13, 1]
日志语句显示哪个 PoolWorker 处理每个任务,最后的日志语句显示 MainProcess 已收到来自对 popen_wrapper
.
的 15 次调用的 return 值
如果您不想使用池,可以为任务设置 mp.Queue
,为 return 值设置 mp.Queue
:
使用 mp.Process
和 mp.Queue
s:
import multiprocessing as mp
import time
import random
import subprocess
import logging
logger = mp.log_to_stderr(logging.WARN)
SENTINEL = None
def popen_wrapper(inqueue, outqueue):
for i in iter(inqueue.get, SENTINEL):
logger.warn('echo "hi"')
outqueue.put(i)
if __name__ == '__main__':
num_to_run = 15
max_parallel = 3
inqueue = mp.Queue()
outqueue = mp.Queue()
procs = [mp.Process(target=popen_wrapper, args=(inqueue, outqueue))
for i in range(max_parallel)]
for p in procs:
p.start()
for i in range(num_to_run):
inqueue.put(i)
for i in range(max_parallel):
# Put sentinels in the queue to tell `popen_wrapper` to quit
inqueue.put(SENTINEL)
for p in procs:
p.join()
results = [outqueue.get() for i in range(num_to_run)]
logger.warn(results)
请注意,如果您使用
procs = [mp.Process(target=popen_wrapper, args=(inqueue, outqueue))
for i in range(max_parallel)]
然后您强制执行 max_parallel
(例如 3)个工作进程。然后,您将所有 15 个任务发送到一个队列:
for i in range(num_to_run):
inqueue.put(i)
并让工作进程从队列中拉取任务:
def popen_wrapper(inqueue, outqueue):
for i in iter(inqueue.get, SENTINEL):
logger.warn('echo "hi"')
outqueue.put(i)
您可能还会发现 Doug Hellman's multiprocessing tutorial of interest. Among the many instructive examples you'll find there is an ActivePool
recipe,它展示了如何生成 10 个进程并限制它们(使用 mp.Semaphore
),以便在任何给定时间只有 3 个进程处于活动状态。虽然这可能具有指导意义,但它可能不是您所处情况的最佳解决方案,因为您似乎没有理由想要生成超过 3 个进程。
我想 运行 15 个命令,但一次只能 运行 3 个
testme.py
import multiprocessing
import time
import random
import subprocess
def popen_wrapper(i):
p = subprocess.Popen( ['echo', 'hi'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
print stdout
time.sleep(randomint(5,20)) #pretend it's doing some work
return p.returncode
num_to_run = 15
max_parallel = 3
running = []
for i in range(num_to_run):
p = multiprocessing.Process(target=popen_wrapper, args=(i,))
running.append(p)
p.start()
if len(running) >= max_parallel:
# blocking wait - join on whoever finishes first then continue
else:
# nonblocking wait- see if any processes is finished. If so, join the finished processes
我不确定如何实施评论:
if len(running) >= max_parallel:
# blocking wait - join on whoever finishes first then continue
else:
# nonblocking wait- see if any processes is finished. If so, join the finished processes
我不能做类似的事情:
for p in running:
p.join()
因为 运行ning 中的第二个进程本可以完成,但我仍然阻塞在第一个进程上。
问题:如何检查running
中的进程是否在阻塞和非阻塞中都完成了(找到第一个完成的)?
寻找类似于 waitpid 的东西,也许
也许最简单的安排方法是使用 multiprocessing.Pool:
pool = mp.Pool(3)
将设置一个包含 3 个工作进程的池。然后你可以发送15个任务到池中:
for i in range(num_to_run):
pool.apply_async(popen_wrapper, args=(i,), callback=log_result)
协调 3 名工人和 15 项任务所需的所有机器是
由 mp.Pool
.
使用mp.Pool:
import multiprocessing as mp
import time
import random
import subprocess
import logging
logger = mp.log_to_stderr(logging.WARN)
def popen_wrapper(i):
logger.warn('echo "hi"')
return i
def log_result(retval):
results.append(retval)
if __name__ == '__main__':
num_to_run = 15
max_parallel = 3
results = []
pool = mp.Pool(max_parallel)
for i in range(num_to_run):
pool.apply_async(popen_wrapper, args=(i,), callback=log_result)
pool.close()
pool.join()
logger.warn(results)
产量
[WARNING/PoolWorker-1] echo "hi"
[WARNING/PoolWorker-3] echo "hi"
[WARNING/PoolWorker-1] echo "hi"
[WARNING/PoolWorker-1] echo "hi"
[WARNING/PoolWorker-3] echo "hi"
[WARNING/PoolWorker-1] echo "hi"
[WARNING/PoolWorker-3] echo "hi"
[WARNING/PoolWorker-1] echo "hi"
[WARNING/PoolWorker-3] echo "hi"
[WARNING/PoolWorker-1] echo "hi"
[WARNING/PoolWorker-3] echo "hi"
[WARNING/PoolWorker-1] echo "hi"
[WARNING/PoolWorker-1] echo "hi"
[WARNING/PoolWorker-3] echo "hi"
[WARNING/PoolWorker-2] echo "hi"
[WARNING/MainProcess] [0, 2, 3, 5, 4, 6, 7, 8, 9, 10, 11, 12, 14, 13, 1]
日志语句显示哪个 PoolWorker 处理每个任务,最后的日志语句显示 MainProcess 已收到来自对 popen_wrapper
.
如果您不想使用池,可以为任务设置 mp.Queue
,为 return 值设置 mp.Queue
:
使用 mp.Process
和 mp.Queue
s:
import multiprocessing as mp
import time
import random
import subprocess
import logging
logger = mp.log_to_stderr(logging.WARN)
SENTINEL = None
def popen_wrapper(inqueue, outqueue):
for i in iter(inqueue.get, SENTINEL):
logger.warn('echo "hi"')
outqueue.put(i)
if __name__ == '__main__':
num_to_run = 15
max_parallel = 3
inqueue = mp.Queue()
outqueue = mp.Queue()
procs = [mp.Process(target=popen_wrapper, args=(inqueue, outqueue))
for i in range(max_parallel)]
for p in procs:
p.start()
for i in range(num_to_run):
inqueue.put(i)
for i in range(max_parallel):
# Put sentinels in the queue to tell `popen_wrapper` to quit
inqueue.put(SENTINEL)
for p in procs:
p.join()
results = [outqueue.get() for i in range(num_to_run)]
logger.warn(results)
请注意,如果您使用
procs = [mp.Process(target=popen_wrapper, args=(inqueue, outqueue))
for i in range(max_parallel)]
然后您强制执行 max_parallel
(例如 3)个工作进程。然后,您将所有 15 个任务发送到一个队列:
for i in range(num_to_run):
inqueue.put(i)
并让工作进程从队列中拉取任务:
def popen_wrapper(inqueue, outqueue):
for i in iter(inqueue.get, SENTINEL):
logger.warn('echo "hi"')
outqueue.put(i)
您可能还会发现 Doug Hellman's multiprocessing tutorial of interest. Among the many instructive examples you'll find there is an ActivePool
recipe,它展示了如何生成 10 个进程并限制它们(使用 mp.Semaphore
),以便在任何给定时间只有 3 个进程处于活动状态。虽然这可能具有指导意义,但它可能不是您所处情况的最佳解决方案,因为您似乎没有理由想要生成超过 3 个进程。