如何创建 Python 的 concurrent.futures.ProcessPoolExecutor.submits() 的连续流?

How to create a continuous stream of Python's concurrent.futures.ProcessPoolExecutor.submits()?

我可以提交 concurrent.futures.ProcessPoolExecutor.submits() 个批次,其中每个批次可能包含几个 submit()。但是,我注意到如果每批提交消耗大量 RAM,那么 RAM 使用效率可能会相当低;需要等待批次中的所有期货完成才能提交另一批submit()

如何创建连续的 Python 的 concurrent.futures.ProcessPoolExecutor.submit() 流,直到满足某些条件?

测试脚本:

#!/usr/bin/env python3

import numpy as np
from numpy.random import default_rng, SeedSequence
import concurrent.futures as cf
from itertools import count


def dojob( process, iterations, samples, rg ):
    # Do some tasks
    result = []
    for i in range( iterations ):
        a = rg.standard_normal( samples )
        b = rg.integers( -3, 3, samples )
        mean = np.mean( a + b )
        result.append( ( i, mean ) )
        return { process : result }

if __name__ == '__main__':

    cpus = 2
    iterations = 10000
    samples = 1000

    # Setup NumPy Random Generator
    ss = SeedSequence( 1234567890 )
    child_seeds = ss.spawn( cpus )
    rg_streams = [ default_rng(s) for s in child_seeds ]

    # Peform concurrent analysis by batches
    counter = count( start=0, step=1 )

    # Serial Run of dojob
    process = next( counter )
    for cpu in range( cpus ):
        process = next( counter )
        rg = rg_streams[ cpu ]
        rdict = dojob( process, iterations, samples, rg )
    print( 'rdict', rdict )

    
    # Concurrent Run of dojob
    futures = []
    results = []
    with cf.ProcessPoolExecutor( max_workers=cpus ) as executor:

        while True:
            
            for cpu in range( cpus ):
                process = next( counter )
                rg = rg_streams[ cpu ]
                futures.append( executor.submit( dojob, process, iterations, samples, rg ) )
                
            for future in cf.as_completed( futures ):
                # Do some post processing
                r = future.result()
                for k, v in r.items():
                    if len( results ) < 5000:
                        results.append( np.std( v ) )
                        print( k, len(results) )

            if len(results) <= 100: #Put a huge number to simulate continuous streaming 
                futures = []
                child_seeds = child_seeds[0].spawn( cpus )
                rg_streams = [ default_rng(s) for s in child_seeds ]
            else:
                break

    print( '\n*** Concurrent Analyses Ended ***' )

为了扩展我的评论,使用完成回调和 threading.Condition 这样的事情怎么样?我也冒昧地添加了一个进度指示器。

编辑: 我将其重构为一个简洁的函数,您传递所需的并发性和队列深度,以及一个生成新作业的函数和另一个处理结果的函数并让遗嘱执行人知道您是否受够了。

import concurrent.futures as cf
import threading
import time
from itertools import count

import numpy as np
from numpy.random import SeedSequence, default_rng


def dojob(process, iterations, samples, rg):
    # Do some tasks
    result = []
    for i in range(iterations):
        a = rg.standard_normal(samples)
        b = rg.integers(-3, 3, samples)
        mean = np.mean(a + b)
        result.append((i, mean))
    return {process: result}


def execute_concurrently(cpus, max_queue_length, get_job_fn, process_result_fn):
    running_futures = set()
    jobs_complete = 0
    job_cond = threading.Condition()
    all_complete_event = threading.Event()

    def on_complete(future):
        nonlocal jobs_complete
        if process_result_fn(future.result()):
            all_complete_event.set()
        running_futures.discard(future)
        jobs_complete += 1
        with job_cond:
            job_cond.notify_all()

    time_since_last_status = 0
    start_time = time.time()
    with cf.ProcessPoolExecutor(cpus) as executor:
        while True:
            while len(running_futures) < max_queue_length:
                fn, args = get_job_fn()
                fut = executor.submit(fn, *args)
                fut.add_done_callback(on_complete)
                running_futures.add(fut)

            with job_cond:
                job_cond.wait()

            if all_complete_event.is_set():
                break

            if time.time() - time_since_last_status > 1.0:
                rps = jobs_complete / (time.time() - start_time)
                print(
                    f"{len(running_futures)} running futures on {cpus} CPUs, "
                    f"{jobs_complete} complete. RPS: {rps:.2f}"
                )
                time_since_last_status = time.time()


def main():
    ss = SeedSequence(1234567890)
    counter = count(start=0, step=1)
    iterations = 10000
    samples = 1000
    results = []

    def get_job():
        seed = ss.spawn(1)[0]
        rg = default_rng(seed)
        process = next(counter)
        return dojob, (process, iterations, samples, rg)

    def process_result(result):
        for k, v in result.items():
            results.append(np.std(v))
        if len(results) >= 10000:
            return True  # signal we're complete

    execute_concurrently(
        cpus=16,
        max_queue_length=20,
        get_job_fn=get_job,
        process_result_fn=process_result,
    )


if __name__ == "__main__":
    main()

发布的答案有效。感谢他。经过测试,我想推荐两个我认为值得考虑和实施的修正案。

修改1:要提前取消python脚本的执行,Ctrl+C 必须使用。不幸的是,这样做不会终止正在执行函数 dojob()concurrent.futures.ProcessPoolExecutor() 进程。当完成 dojob() 的时间很长时,这个问题会变得更加明显;可以通过使脚本中的样本量较大(例如 samples = 100000)来模拟这种情况。执行终端命令 ps -ef | grep python 时可以看到此问题。此外,如果 dojob() 消耗大量 RAM,则这些并发进程使用的内存不会被释放,直到并发进程被手动终止(例如 kill -9 [PID])。为了解决这些问题,需要进行以下修正。

with job_cond:
    job_cond.wait()

应改为:

try:
    with job_cond:
        job_cond.wait()
except KeyboardInterrupt:
    # Cancel running futures
    for future in running_futures:
        _ = future.cancel()
    # Ensure concurrent.futures.executor jobs really do finish.
    _ = cf.wait(running_futures, timeout=None)

所以要用Ctrl+C时,只要先按一次就可以了。接下来,给 running_futures 中的期货取消一些时间。这可能需要几秒到几秒才能完成;这取决于 dojob() 的资源需求。您可以在任务管理器或系统监视器中看到 CPU activity 降为零,或者听到 cpu 冷却风扇的高转速声音减少。请注意,使用的 RAM 尚未释放。此后,再次按 Ctrl+C,这应该允许所有并发进程干净退出,从而释放使用的 RAM。

修正案 2: 目前,内部 while 循环规定必须以 cpu“mainThread”允许的速度连续提交作业。实际上,提交的作业多于 cpus 池中可用的 cpus 并没有什么好处。这样做只会不必要地消耗来自主处理器“MainThread”的 cpu 资源。要规范连续作业提交,可以使用新的 submit_job threading.Event() 对象。

首先,定义这样一个对象并将其值设置为True,其中:

submit_job = threading.Event()
submit_job.set()

接下来,在内部 while 循环的末尾添加此条件和 .wait() 方法:

with cf.ProcessPoolExecutor(cpus) as executor:
    while True:
        while len(running_futures) < max_queue_length:
            fn, args = get_job_fn()
            fut = executor.submit(fn, *args)
            fut.add_done_callback(on_complete)
            running_futures.add(fut)
            if len(running_futures) >= cpus: # Add this line
                submit_job.clear()           # Add this line
            submit_job.wait()                # Add this line

最后将 on_complete(future) 回调更改为:

def on_complete(future):
    nonlocal jobs_complete
    if process_result_fn(future.result()):
        all_complete_event.set()
    running_futures.discard(future)
    if len(running_futures) < cpus: # add this conditional setting
        submit_job.set()            # add this conditional setting
    jobs_complete += 1
    with job_cond:
        job_cond.notify_all()