如何使用concurrent.future.wait?
How to use concurrent.future.wait?
我正在学习 python,我在 concurrent.futures.wait() 上遇到了一些问题 -- 这是详细信息--
我想让主进程保持到所有 child 进程完成。所以我使用 wait() 来阻止主进程。但是我总是出错,请帮助。
def child_process(args):
pid=os.getpid();
while (args.len() > 0 ):
task=args.pop(0)
time.sleep(1+ random.random()*5) #simulate the worker time
print("Process "+str(pid)+" : "+task[0]+" "+task[1])
return
if (__name__ == "__main__") :
mgr = multiprocessing.Manager()
tasks=mgr.list()
tasks=[[1,10],[2,20],[3,30],[4,40],[5,50],[6,60]]
#executor=ProcessPoolExecutor(max_workers=3)
f=[]
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
f.append(executor.submit(child_process,tasks))
f.append(executor.submit(child_process,tasks))
f.append(executor.submit(child_process,tasks))
# wait(future,timeout=0,return_when=concurrent.futures.ALL_COMPLETED)
concurrent.futures.wait(f[0])
concurrent.futures.wait(f[1])
concurrent.futures.wait(f[2])
executor.shutdown()
错误是--
C:\Work\python\src\test>python test.py
Traceback (most recent call last):
File "C:\Work\python\src\test\test.py", line 70, in <module>
concurrent.futures.wait(f[0])
File "C:\tools\Python310\lib\concurrent\futures\_base.py", line 290, in wait
fs = set(fs)
TypeError: 'Future' object is not iterable
最让我困惑的是 f[0] 不是 submit() 返回的未来 object 吗?
然后我尝试了--
wait(f,timeout=0,return_when=concurrent.futures.ALL_COMPLETED)
新的错误是--
C:\Work\python\src\test>python test.py
C:\Work\python\src\test\test.py:68: RuntimeWarning: coroutine 'wait' was never awaited
wait(f,timeout=0,return_when=concurrent.futures.ALL_COMPLETED)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
我真的不知道怎么解决。请孩子指教。谢谢
问候
艾森
需要指出的几点:
- 在
while
语句中用括号包裹表达式是多余的。
>>> a = 0
>>> while a < 10:
... a += 1
- 错误消息说 “未来对象不可迭代” - 这意味着,您传递的
f[0]
确实是 Future Object
,这不是等待方法期待。
>>> from concurrent import futures
>>> help(futures.wait)
Help on function wait in module concurrent.futures._base:
wait(fs, timeout=None, return_when='ALL_COMPLETED')
Wait for the futures in the given sequence to complete.
Args:
fs: The sequence of Futures (possibly created by different Executors) to
wait upon.
# ...
这里我们可以看到论点fs
实际上期望你Futures
的Sequence
。
所以不是这个:
concurrent.futures.wait(f[0])
concurrent.futures.wait(f[1])
concurrent.futures.wait(f[2])
你可能想要这个:
concurrent.futures.wait(f)
这仍然不是必需的,因为 with
阻塞等待直到所有进程停止。
演示如下:
"""
Demo codes for
Waiting for child process to complete
"""
import os
import math
import queue
import multiprocessing as mp
from concurrent import futures
def child_process(task_queue: mp.Queue):
pid = os.getpid()
print(f"[{pid}] Started!")
processed_count = 0
while True:
try:
item = task_queue.get_nowait()
except queue.Empty:
# task done
break
# else continue on
# some workload
try:
print(f"[{pid}] {item}! = {math.factorial(item)}")
finally:
# tell queue we processed the item.
task_queue.task_done()
processed_count += 1
print(f"[{pid}] Task done!")
def main():
# just merely rapping codes in function namespace makes codes tiny bit faster
mp_manager = mp.Manager()
task_queue = mp_manager.Queue()
# populate queue
for n in range(100):
task_queue.put_nowait(n)
# start pool
with futures.ProcessPoolExecutor() as executor:
future_list = [executor.submit(child_process, task_queue) for _ in range(5)]
# can use executor.shutdown(wait=True) instead
# not required since all executor wait for all process to stop when exiting `with` block.
# hence, also no need to manually call executor.shutdown().
futures.wait(future_list)
if __name__ == '__main__':
main()
输出:
[18412] Started!
[18412] 0! = 1
[4680] Started!
[18412] 1! = 1
[2664] Started!
[18412] 2! = 2
[18412] 3! = 6
[17900] Started!
[18412] 4! = 24
[18412] 5! = 120
[4680] 6! = 720
[4680] 7! = 5040
[18412] 8! = 40320
[17900] 9! = 362880
[4680] 10! = 3628800
[18412] 11! = 39916800
...
[17900] 21! = 51090942171709440000
[4680] 22! = 1124000727777607680000
[2664] 23! = 25852016738884976640000
[16792] Started!
[18412] 24! = 620448401733239439360000
[17900] 25! = 15511210043330985984000000
...
[17900] 99! = 933262154439441526816992388562667004907159682643816214685929638952175999932299156089414639761565182862536979208272237582511852109168640000000000000000000000
[18412] Task done!
[17900] Task done!
[16792] Task done!
[2664] Task done!
[4680] Task done!
我正在学习 python,我在 concurrent.futures.wait() 上遇到了一些问题 -- 这是详细信息-- 我想让主进程保持到所有 child 进程完成。所以我使用 wait() 来阻止主进程。但是我总是出错,请帮助。
def child_process(args):
pid=os.getpid();
while (args.len() > 0 ):
task=args.pop(0)
time.sleep(1+ random.random()*5) #simulate the worker time
print("Process "+str(pid)+" : "+task[0]+" "+task[1])
return
if (__name__ == "__main__") :
mgr = multiprocessing.Manager()
tasks=mgr.list()
tasks=[[1,10],[2,20],[3,30],[4,40],[5,50],[6,60]]
#executor=ProcessPoolExecutor(max_workers=3)
f=[]
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
f.append(executor.submit(child_process,tasks))
f.append(executor.submit(child_process,tasks))
f.append(executor.submit(child_process,tasks))
# wait(future,timeout=0,return_when=concurrent.futures.ALL_COMPLETED)
concurrent.futures.wait(f[0])
concurrent.futures.wait(f[1])
concurrent.futures.wait(f[2])
executor.shutdown()
错误是--
C:\Work\python\src\test>python test.py
Traceback (most recent call last):
File "C:\Work\python\src\test\test.py", line 70, in <module>
concurrent.futures.wait(f[0])
File "C:\tools\Python310\lib\concurrent\futures\_base.py", line 290, in wait
fs = set(fs)
TypeError: 'Future' object is not iterable
最让我困惑的是 f[0] 不是 submit() 返回的未来 object 吗?
然后我尝试了--
wait(f,timeout=0,return_when=concurrent.futures.ALL_COMPLETED)
新的错误是--
C:\Work\python\src\test>python test.py
C:\Work\python\src\test\test.py:68: RuntimeWarning: coroutine 'wait' was never awaited
wait(f,timeout=0,return_when=concurrent.futures.ALL_COMPLETED)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
我真的不知道怎么解决。请孩子指教。谢谢
问候 艾森
需要指出的几点:
- 在
while
语句中用括号包裹表达式是多余的。
>>> a = 0
>>> while a < 10:
... a += 1
- 错误消息说 “未来对象不可迭代” - 这意味着,您传递的
f[0]
确实是Future Object
,这不是等待方法期待。
>>> from concurrent import futures
>>> help(futures.wait)
Help on function wait in module concurrent.futures._base:
wait(fs, timeout=None, return_when='ALL_COMPLETED')
Wait for the futures in the given sequence to complete.
Args:
fs: The sequence of Futures (possibly created by different Executors) to
wait upon.
# ...
这里我们可以看到论点fs
实际上期望你Futures
的Sequence
。
所以不是这个:
concurrent.futures.wait(f[0])
concurrent.futures.wait(f[1])
concurrent.futures.wait(f[2])
你可能想要这个:
concurrent.futures.wait(f)
这仍然不是必需的,因为 with
阻塞等待直到所有进程停止。
演示如下:
"""
Demo codes for
Waiting for child process to complete
"""
import os
import math
import queue
import multiprocessing as mp
from concurrent import futures
def child_process(task_queue: mp.Queue):
pid = os.getpid()
print(f"[{pid}] Started!")
processed_count = 0
while True:
try:
item = task_queue.get_nowait()
except queue.Empty:
# task done
break
# else continue on
# some workload
try:
print(f"[{pid}] {item}! = {math.factorial(item)}")
finally:
# tell queue we processed the item.
task_queue.task_done()
processed_count += 1
print(f"[{pid}] Task done!")
def main():
# just merely rapping codes in function namespace makes codes tiny bit faster
mp_manager = mp.Manager()
task_queue = mp_manager.Queue()
# populate queue
for n in range(100):
task_queue.put_nowait(n)
# start pool
with futures.ProcessPoolExecutor() as executor:
future_list = [executor.submit(child_process, task_queue) for _ in range(5)]
# can use executor.shutdown(wait=True) instead
# not required since all executor wait for all process to stop when exiting `with` block.
# hence, also no need to manually call executor.shutdown().
futures.wait(future_list)
if __name__ == '__main__':
main()
输出:
[18412] Started!
[18412] 0! = 1
[4680] Started!
[18412] 1! = 1
[2664] Started!
[18412] 2! = 2
[18412] 3! = 6
[17900] Started!
[18412] 4! = 24
[18412] 5! = 120
[4680] 6! = 720
[4680] 7! = 5040
[18412] 8! = 40320
[17900] 9! = 362880
[4680] 10! = 3628800
[18412] 11! = 39916800
...
[17900] 21! = 51090942171709440000
[4680] 22! = 1124000727777607680000
[2664] 23! = 25852016738884976640000
[16792] Started!
[18412] 24! = 620448401733239439360000
[17900] 25! = 15511210043330985984000000
...
[17900] 99! = 933262154439441526816992388562667004907159682643816214685929638952175999932299156089414639761565182862536979208272237582511852109168640000000000000000000000
[18412] Task done!
[17900] Task done!
[16792] Task done!
[2664] Task done!
[4680] Task done!