了解多处理程序中的 starmap_async()
Understand starmap_async() in multiprocessing programmation
上面的代码工作正常,但我不完全理解它:
from multiprocessing import Pool,Manager
from itertools import chain
def calculate_primes():
ncore = 2
N = 50
with Manager() as manager:
input = manager.Queue()
output = manager.Queue()
with Pool(ncore) as p:
it = p.starmap_async(find_prime,[(input,output)])
for r in chunks(range(1,N),10): # chunks the list into sublists
input.put(r)
input.put(None)
it.wait() # if here: doesn't work! Why?
output.put(None)
it.wait() # mut be here to works fine!!
res = list(chain(*list(iter(output.get,None))))
return res
find_prime() 是一个 returns 素数列表的函数。
什么时候p.starmap_async(somefunction,[(input,output)])指令真的执行了?
为什么这个函数需要执行it.wait()指令?
如果函数中没有it.wait()指令=>它returns空列表[]
如果 it.wait() 指令位于 inut.put(None) => 它似乎是一个我必须终止的无限循环。
为什么?非常感谢。
map
函数的想法是减轻程序员设置队列、从队列中读取并按顺序 assemble 响应的负担。您的代码可以简化为:
def find_prime(chunk):
primes_found = list(filter(check_prime, chunk))
return primes_found
def calculate_primes():
ncore = 2
N = 50
with Pool(ncore) as p:
res = p.map(find_prime, chunks(range(1,N), 10))
return res
(其中 chunks
做了我猜它做的事)
原代码存在以下问题:
- 当您调用
p.starmap_async(find_prime,[(input,output)])
时,您传递的是一个长度为 1 的可迭代对象,因此只会使用一个进程。您可以使用 [(input,output)] * ncore
而不是使用 ncore
个进程。
- 如果您 使用多个进程,则只有其中一个进程会从队列中获取
None
值,而另一个进程将永远等待。这个bug被前一个隐藏了。
it.wait()
永远等待进程完成。如果你把它放在 before input.put(None)
之前,生成的进程正在等待 None
值完成,而主进程正在等待生成的进程完成:那里是一个死锁(请注意,问题说的是一件事“t.wait() 指令位于 input.put(None)
之前”,但代码说的是另一件事)。
- 由于您使用的是
async
版本,您必须 wait
才能完成此过程。如果您不调用 wait
,您会立即在 output
队列中放置一个 None
,然后再派生的进程提供任何结果。因此,从队列中删除的第一个值是 None
,表示 iter
调用的迭代结束:您最终得到一个空列表。此外,您在进程仍在 运行 时退出 with Pool(ncore) as p:
并且(我猜)它们已终止。
传递数据块而不是一个一个地处理是一个好主意:这就是map
函数的chunksize
参数为了。然而,它对您当前实施的 find_prime
没有用
上面的代码工作正常,但我不完全理解它:
from multiprocessing import Pool,Manager
from itertools import chain
def calculate_primes():
ncore = 2
N = 50
with Manager() as manager:
input = manager.Queue()
output = manager.Queue()
with Pool(ncore) as p:
it = p.starmap_async(find_prime,[(input,output)])
for r in chunks(range(1,N),10): # chunks the list into sublists
input.put(r)
input.put(None)
it.wait() # if here: doesn't work! Why?
output.put(None)
it.wait() # mut be here to works fine!!
res = list(chain(*list(iter(output.get,None))))
return res
find_prime() 是一个 returns 素数列表的函数。
什么时候p.starmap_async(somefunction,[(input,output)])指令真的执行了? 为什么这个函数需要执行it.wait()指令?
如果函数中没有it.wait()指令=>它returns空列表[] 如果 it.wait() 指令位于 inut.put(None) => 它似乎是一个我必须终止的无限循环。
为什么?非常感谢。
map
函数的想法是减轻程序员设置队列、从队列中读取并按顺序 assemble 响应的负担。您的代码可以简化为:
def find_prime(chunk):
primes_found = list(filter(check_prime, chunk))
return primes_found
def calculate_primes():
ncore = 2
N = 50
with Pool(ncore) as p:
res = p.map(find_prime, chunks(range(1,N), 10))
return res
(其中 chunks
做了我猜它做的事)
原代码存在以下问题:
- 当您调用
p.starmap_async(find_prime,[(input,output)])
时,您传递的是一个长度为 1 的可迭代对象,因此只会使用一个进程。您可以使用[(input,output)] * ncore
而不是使用ncore
个进程。 - 如果您 使用多个进程,则只有其中一个进程会从队列中获取
None
值,而另一个进程将永远等待。这个bug被前一个隐藏了。 it.wait()
永远等待进程完成。如果你把它放在 beforeinput.put(None)
之前,生成的进程正在等待None
值完成,而主进程正在等待生成的进程完成:那里是一个死锁(请注意,问题说的是一件事“t.wait() 指令位于input.put(None)
之前”,但代码说的是另一件事)。- 由于您使用的是
async
版本,您必须wait
才能完成此过程。如果您不调用wait
,您会立即在output
队列中放置一个None
,然后再派生的进程提供任何结果。因此,从队列中删除的第一个值是None
,表示iter
调用的迭代结束:您最终得到一个空列表。此外,您在进程仍在 运行 时退出with Pool(ncore) as p:
并且(我猜)它们已终止。
传递数据块而不是一个一个地处理是一个好主意:这就是map
函数的chunksize
参数为了。然而,它对您当前实施的 find_prime