如何使用concurrent.future.wait?

How to use concurrent.future.wait?

我正在学习 python,我在 concurrent.futures.wait() 上遇到了一些问题 -- 这是详细信息-- 我想让主进程保持到所有 child 进程完成。所以我使用 wait() 来阻止主进程。但是我总是出错,请帮助。

def child_process(args):
    pid=os.getpid();
    while (args.len() > 0 ):
        task=args.pop(0)
        time.sleep(1+ random.random()*5)   #simulate the worker time
    print("Process "+str(pid)+" : "+task[0]+"  "+task[1])
    return


if (__name__  ==  "__main__") :

   mgr = multiprocessing.Manager()
   tasks=mgr.list()
   tasks=[[1,10],[2,20],[3,30],[4,40],[5,50],[6,60]]

    
    #executor=ProcessPoolExecutor(max_workers=3)
    f=[]
    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
        f.append(executor.submit(child_process,tasks))
        f.append(executor.submit(child_process,tasks))
        f.append(executor.submit(child_process,tasks))
                
#        wait(future,timeout=0,return_when=concurrent.futures.ALL_COMPLETED)

        concurrent.futures.wait(f[0])
        concurrent.futures.wait(f[1])
        concurrent.futures.wait(f[2])
        
        executor.shutdown()

错误是--

C:\Work\python\src\test>python test.py
Traceback (most recent call last):
  File "C:\Work\python\src\test\test.py", line 70, in <module>
    concurrent.futures.wait(f[0])
  File "C:\tools\Python310\lib\concurrent\futures\_base.py", line 290, in wait
    fs = set(fs)
TypeError: 'Future' object is not iterable

最让我困惑的是 f[0] 不是 submit() 返回的未来 object 吗?

然后我尝试了--

wait(f,timeout=0,return_when=concurrent.futures.ALL_COMPLETED)

新的错误是--

C:\Work\python\src\test>python test.py
C:\Work\python\src\test\test.py:68: RuntimeWarning: coroutine 'wait' was never awaited
  wait(f,timeout=0,return_when=concurrent.futures.ALL_COMPLETED)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

我真的不知道怎么解决。请孩子指教。谢谢

问候 艾森

需要指出的几点:

  • while 语句中用括号包裹表达式是多余的。
>>> a = 0
>>> while a < 10:
...     a += 1

  • 错误消息说 “未来对象不可迭代” - 这意味着,您传递的 f[0] 确实是 Future Object,这不是等待方法期待。
>>> from concurrent import futures
>>> help(futures.wait)
Help on function wait in module concurrent.futures._base:

wait(fs, timeout=None, return_when='ALL_COMPLETED')
    Wait for the futures in the given sequence to complete.

    Args:
        fs: The sequence of Futures (possibly created by different Executors) to
            wait upon.
# ...

这里我们可以看到论点fs实际上期望你FuturesSequence

所以不是这个:

concurrent.futures.wait(f[0])
concurrent.futures.wait(f[1])
concurrent.futures.wait(f[2])

你可能想要这个:

concurrent.futures.wait(f)

这仍然不是必需的,因为 with 阻塞等待直到所有进程停止。


演示如下:

"""
Demo codes for 

Waiting for child process to complete
"""

import os
import math
import queue
import multiprocessing as mp
from concurrent import futures


def child_process(task_queue: mp.Queue):
    pid = os.getpid()

    print(f"[{pid}] Started!")
    processed_count = 0

    while True:
        try:
            item = task_queue.get_nowait()
        except queue.Empty:
            # task done
            break

        # else continue on
        # some workload
        try:
            print(f"[{pid}] {item}! = {math.factorial(item)}")

        finally:
            # tell queue we processed the item.
            task_queue.task_done()
            processed_count += 1

    print(f"[{pid}] Task done!")


def main():
    # just merely rapping codes in function namespace makes codes tiny bit faster

    mp_manager = mp.Manager()
    task_queue = mp_manager.Queue()

    # populate queue
    for n in range(100):
        task_queue.put_nowait(n)

    # start pool
    with futures.ProcessPoolExecutor() as executor:
        future_list = [executor.submit(child_process, task_queue) for _ in range(5)]

        # can use executor.shutdown(wait=True) instead
        # not required since all executor wait for all process to stop when exiting `with` block.
        # hence, also no need to manually call executor.shutdown().
        futures.wait(future_list)


if __name__ == '__main__':
    main()

输出:

[18412] Started!
[18412] 0! = 1
[4680] Started!
[18412] 1! = 1
[2664] Started!
[18412] 2! = 2
[18412] 3! = 6
[17900] Started!
[18412] 4! = 24
[18412] 5! = 120
[4680] 6! = 720
[4680] 7! = 5040
[18412] 8! = 40320
[17900] 9! = 362880
[4680] 10! = 3628800
[18412] 11! = 39916800

...

[17900] 21! = 51090942171709440000
[4680] 22! = 1124000727777607680000
[2664] 23! = 25852016738884976640000
[16792] Started!
[18412] 24! = 620448401733239439360000
[17900] 25! = 15511210043330985984000000

...

[17900] 99! = 933262154439441526816992388562667004907159682643816214685929638952175999932299156089414639761565182862536979208272237582511852109168640000000000000000000000
[18412] Task done!
[17900] Task done!
[16792] Task done!
[2664] Task done!
[4680] Task done!