如何创建 Python 的 concurrent.futures.ProcessPoolExecutor.submits() 的连续流?
How to create a continuous stream of Python's concurrent.futures.ProcessPoolExecutor.submits()?
我可以提交 concurrent.futures.ProcessPoolExecutor.submits()
个批次,其中每个批次可能包含几个 submit()
。但是,我注意到如果每批提交消耗大量 RAM,那么 RAM 使用效率可能会相当低;需要等待批次中的所有期货完成才能提交另一批submit()
。
如何创建连续的 Python 的 concurrent.futures.ProcessPoolExecutor.submit()
流,直到满足某些条件?
测试脚本:
#!/usr/bin/env python3
import numpy as np
from numpy.random import default_rng, SeedSequence
import concurrent.futures as cf
from itertools import count
def dojob( process, iterations, samples, rg ):
# Do some tasks
result = []
for i in range( iterations ):
a = rg.standard_normal( samples )
b = rg.integers( -3, 3, samples )
mean = np.mean( a + b )
result.append( ( i, mean ) )
return { process : result }
if __name__ == '__main__':
cpus = 2
iterations = 10000
samples = 1000
# Setup NumPy Random Generator
ss = SeedSequence( 1234567890 )
child_seeds = ss.spawn( cpus )
rg_streams = [ default_rng(s) for s in child_seeds ]
# Peform concurrent analysis by batches
counter = count( start=0, step=1 )
# Serial Run of dojob
process = next( counter )
for cpu in range( cpus ):
process = next( counter )
rg = rg_streams[ cpu ]
rdict = dojob( process, iterations, samples, rg )
print( 'rdict', rdict )
# Concurrent Run of dojob
futures = []
results = []
with cf.ProcessPoolExecutor( max_workers=cpus ) as executor:
while True:
for cpu in range( cpus ):
process = next( counter )
rg = rg_streams[ cpu ]
futures.append( executor.submit( dojob, process, iterations, samples, rg ) )
for future in cf.as_completed( futures ):
# Do some post processing
r = future.result()
for k, v in r.items():
if len( results ) < 5000:
results.append( np.std( v ) )
print( k, len(results) )
if len(results) <= 100: #Put a huge number to simulate continuous streaming
futures = []
child_seeds = child_seeds[0].spawn( cpus )
rg_streams = [ default_rng(s) for s in child_seeds ]
else:
break
print( '\n*** Concurrent Analyses Ended ***' )
为了扩展我的评论,使用完成回调和 threading.Condition
这样的事情怎么样?我也冒昧地添加了一个进度指示器。
编辑: 我将其重构为一个简洁的函数,您传递所需的并发性和队列深度,以及一个生成新作业的函数和另一个处理结果的函数并让遗嘱执行人知道您是否受够了。
import concurrent.futures as cf
import threading
import time
from itertools import count
import numpy as np
from numpy.random import SeedSequence, default_rng
def dojob(process, iterations, samples, rg):
# Do some tasks
result = []
for i in range(iterations):
a = rg.standard_normal(samples)
b = rg.integers(-3, 3, samples)
mean = np.mean(a + b)
result.append((i, mean))
return {process: result}
def execute_concurrently(cpus, max_queue_length, get_job_fn, process_result_fn):
running_futures = set()
jobs_complete = 0
job_cond = threading.Condition()
all_complete_event = threading.Event()
def on_complete(future):
nonlocal jobs_complete
if process_result_fn(future.result()):
all_complete_event.set()
running_futures.discard(future)
jobs_complete += 1
with job_cond:
job_cond.notify_all()
time_since_last_status = 0
start_time = time.time()
with cf.ProcessPoolExecutor(cpus) as executor:
while True:
while len(running_futures) < max_queue_length:
fn, args = get_job_fn()
fut = executor.submit(fn, *args)
fut.add_done_callback(on_complete)
running_futures.add(fut)
with job_cond:
job_cond.wait()
if all_complete_event.is_set():
break
if time.time() - time_since_last_status > 1.0:
rps = jobs_complete / (time.time() - start_time)
print(
f"{len(running_futures)} running futures on {cpus} CPUs, "
f"{jobs_complete} complete. RPS: {rps:.2f}"
)
time_since_last_status = time.time()
def main():
ss = SeedSequence(1234567890)
counter = count(start=0, step=1)
iterations = 10000
samples = 1000
results = []
def get_job():
seed = ss.spawn(1)[0]
rg = default_rng(seed)
process = next(counter)
return dojob, (process, iterations, samples, rg)
def process_result(result):
for k, v in result.items():
results.append(np.std(v))
if len(results) >= 10000:
return True # signal we're complete
execute_concurrently(
cpus=16,
max_queue_length=20,
get_job_fn=get_job,
process_result_fn=process_result,
)
if __name__ == "__main__":
main()
发布的答案有效。感谢他。经过测试,我想推荐两个我认为值得考虑和实施的修正案。
修改1:要提前取消python脚本的执行,Ctrl+C 必须使用。不幸的是,这样做不会终止正在执行函数 dojob()
的 concurrent.futures.ProcessPoolExecutor()
进程。当完成 dojob()
的时间很长时,这个问题会变得更加明显;可以通过使脚本中的样本量较大(例如 samples = 100000
)来模拟这种情况。执行终端命令 ps -ef | grep python
时可以看到此问题。此外,如果 dojob()
消耗大量 RAM,则这些并发进程使用的内存不会被释放,直到并发进程被手动终止(例如 kill -9 [PID]
)。为了解决这些问题,需要进行以下修正。
with job_cond:
job_cond.wait()
应改为:
try:
with job_cond:
job_cond.wait()
except KeyboardInterrupt:
# Cancel running futures
for future in running_futures:
_ = future.cancel()
# Ensure concurrent.futures.executor jobs really do finish.
_ = cf.wait(running_futures, timeout=None)
所以要用Ctrl+C时,只要先按一次就可以了。接下来,给 running_futures
中的期货取消一些时间。这可能需要几秒到几秒才能完成;这取决于 dojob()
的资源需求。您可以在任务管理器或系统监视器中看到 CPU activity 降为零,或者听到 cpu 冷却风扇的高转速声音减少。请注意,使用的 RAM 尚未释放。此后,再次按 Ctrl+C,这应该允许所有并发进程干净退出,从而释放使用的 RAM。
修正案 2: 目前,内部 while 循环规定必须以 cpu“mainThread”允许的速度连续提交作业。实际上,提交的作业多于 cpus 池中可用的 cpus 并没有什么好处。这样做只会不必要地消耗来自主处理器“MainThread”的 cpu 资源。要规范连续作业提交,可以使用新的 submit_job
threading.Event()
对象。
首先,定义这样一个对象并将其值设置为True
,其中:
submit_job = threading.Event()
submit_job.set()
接下来,在内部 while 循环的末尾添加此条件和 .wait()
方法:
with cf.ProcessPoolExecutor(cpus) as executor:
while True:
while len(running_futures) < max_queue_length:
fn, args = get_job_fn()
fut = executor.submit(fn, *args)
fut.add_done_callback(on_complete)
running_futures.add(fut)
if len(running_futures) >= cpus: # Add this line
submit_job.clear() # Add this line
submit_job.wait() # Add this line
最后将 on_complete(future)
回调更改为:
def on_complete(future):
nonlocal jobs_complete
if process_result_fn(future.result()):
all_complete_event.set()
running_futures.discard(future)
if len(running_futures) < cpus: # add this conditional setting
submit_job.set() # add this conditional setting
jobs_complete += 1
with job_cond:
job_cond.notify_all()
我可以提交 concurrent.futures.ProcessPoolExecutor.submits()
个批次,其中每个批次可能包含几个 submit()
。但是,我注意到如果每批提交消耗大量 RAM,那么 RAM 使用效率可能会相当低;需要等待批次中的所有期货完成才能提交另一批submit()
。
如何创建连续的 Python 的 concurrent.futures.ProcessPoolExecutor.submit()
流,直到满足某些条件?
测试脚本:
#!/usr/bin/env python3
import numpy as np
from numpy.random import default_rng, SeedSequence
import concurrent.futures as cf
from itertools import count
def dojob( process, iterations, samples, rg ):
# Do some tasks
result = []
for i in range( iterations ):
a = rg.standard_normal( samples )
b = rg.integers( -3, 3, samples )
mean = np.mean( a + b )
result.append( ( i, mean ) )
return { process : result }
if __name__ == '__main__':
cpus = 2
iterations = 10000
samples = 1000
# Setup NumPy Random Generator
ss = SeedSequence( 1234567890 )
child_seeds = ss.spawn( cpus )
rg_streams = [ default_rng(s) for s in child_seeds ]
# Peform concurrent analysis by batches
counter = count( start=0, step=1 )
# Serial Run of dojob
process = next( counter )
for cpu in range( cpus ):
process = next( counter )
rg = rg_streams[ cpu ]
rdict = dojob( process, iterations, samples, rg )
print( 'rdict', rdict )
# Concurrent Run of dojob
futures = []
results = []
with cf.ProcessPoolExecutor( max_workers=cpus ) as executor:
while True:
for cpu in range( cpus ):
process = next( counter )
rg = rg_streams[ cpu ]
futures.append( executor.submit( dojob, process, iterations, samples, rg ) )
for future in cf.as_completed( futures ):
# Do some post processing
r = future.result()
for k, v in r.items():
if len( results ) < 5000:
results.append( np.std( v ) )
print( k, len(results) )
if len(results) <= 100: #Put a huge number to simulate continuous streaming
futures = []
child_seeds = child_seeds[0].spawn( cpus )
rg_streams = [ default_rng(s) for s in child_seeds ]
else:
break
print( '\n*** Concurrent Analyses Ended ***' )
为了扩展我的评论,使用完成回调和 threading.Condition
这样的事情怎么样?我也冒昧地添加了一个进度指示器。
编辑: 我将其重构为一个简洁的函数,您传递所需的并发性和队列深度,以及一个生成新作业的函数和另一个处理结果的函数并让遗嘱执行人知道您是否受够了。
import concurrent.futures as cf
import threading
import time
from itertools import count
import numpy as np
from numpy.random import SeedSequence, default_rng
def dojob(process, iterations, samples, rg):
# Do some tasks
result = []
for i in range(iterations):
a = rg.standard_normal(samples)
b = rg.integers(-3, 3, samples)
mean = np.mean(a + b)
result.append((i, mean))
return {process: result}
def execute_concurrently(cpus, max_queue_length, get_job_fn, process_result_fn):
running_futures = set()
jobs_complete = 0
job_cond = threading.Condition()
all_complete_event = threading.Event()
def on_complete(future):
nonlocal jobs_complete
if process_result_fn(future.result()):
all_complete_event.set()
running_futures.discard(future)
jobs_complete += 1
with job_cond:
job_cond.notify_all()
time_since_last_status = 0
start_time = time.time()
with cf.ProcessPoolExecutor(cpus) as executor:
while True:
while len(running_futures) < max_queue_length:
fn, args = get_job_fn()
fut = executor.submit(fn, *args)
fut.add_done_callback(on_complete)
running_futures.add(fut)
with job_cond:
job_cond.wait()
if all_complete_event.is_set():
break
if time.time() - time_since_last_status > 1.0:
rps = jobs_complete / (time.time() - start_time)
print(
f"{len(running_futures)} running futures on {cpus} CPUs, "
f"{jobs_complete} complete. RPS: {rps:.2f}"
)
time_since_last_status = time.time()
def main():
ss = SeedSequence(1234567890)
counter = count(start=0, step=1)
iterations = 10000
samples = 1000
results = []
def get_job():
seed = ss.spawn(1)[0]
rg = default_rng(seed)
process = next(counter)
return dojob, (process, iterations, samples, rg)
def process_result(result):
for k, v in result.items():
results.append(np.std(v))
if len(results) >= 10000:
return True # signal we're complete
execute_concurrently(
cpus=16,
max_queue_length=20,
get_job_fn=get_job,
process_result_fn=process_result,
)
if __name__ == "__main__":
main()
修改1:要提前取消python脚本的执行,Ctrl+C 必须使用。不幸的是,这样做不会终止正在执行函数 dojob()
的 concurrent.futures.ProcessPoolExecutor()
进程。当完成 dojob()
的时间很长时,这个问题会变得更加明显;可以通过使脚本中的样本量较大(例如 samples = 100000
)来模拟这种情况。执行终端命令 ps -ef | grep python
时可以看到此问题。此外,如果 dojob()
消耗大量 RAM,则这些并发进程使用的内存不会被释放,直到并发进程被手动终止(例如 kill -9 [PID]
)。为了解决这些问题,需要进行以下修正。
with job_cond:
job_cond.wait()
应改为:
try:
with job_cond:
job_cond.wait()
except KeyboardInterrupt:
# Cancel running futures
for future in running_futures:
_ = future.cancel()
# Ensure concurrent.futures.executor jobs really do finish.
_ = cf.wait(running_futures, timeout=None)
所以要用Ctrl+C时,只要先按一次就可以了。接下来,给 running_futures
中的期货取消一些时间。这可能需要几秒到几秒才能完成;这取决于 dojob()
的资源需求。您可以在任务管理器或系统监视器中看到 CPU activity 降为零,或者听到 cpu 冷却风扇的高转速声音减少。请注意,使用的 RAM 尚未释放。此后,再次按 Ctrl+C,这应该允许所有并发进程干净退出,从而释放使用的 RAM。
修正案 2: 目前,内部 while 循环规定必须以 cpu“mainThread”允许的速度连续提交作业。实际上,提交的作业多于 cpus 池中可用的 cpus 并没有什么好处。这样做只会不必要地消耗来自主处理器“MainThread”的 cpu 资源。要规范连续作业提交,可以使用新的 submit_job
threading.Event()
对象。
首先,定义这样一个对象并将其值设置为True
,其中:
submit_job = threading.Event()
submit_job.set()
接下来,在内部 while 循环的末尾添加此条件和 .wait()
方法:
with cf.ProcessPoolExecutor(cpus) as executor:
while True:
while len(running_futures) < max_queue_length:
fn, args = get_job_fn()
fut = executor.submit(fn, *args)
fut.add_done_callback(on_complete)
running_futures.add(fut)
if len(running_futures) >= cpus: # Add this line
submit_job.clear() # Add this line
submit_job.wait() # Add this line
最后将 on_complete(future)
回调更改为:
def on_complete(future):
nonlocal jobs_complete
if process_result_fn(future.result()):
all_complete_event.set()
running_futures.discard(future)
if len(running_futures) < cpus: # add this conditional setting
submit_job.set() # add this conditional setting
jobs_complete += 1
with job_cond:
job_cond.notify_all()