如何成功利用 Queue.join() 进行多处理?
How to successfully utilize Queue.join() with multiprocessing?
我正在尝试学习 Python 中的多处理库,但我无法让我的代码与 queue.Queue
一起工作。简而言之,我不知道将 queue.Queue.join()
方法放在我的代码中的什么位置。它是在 while 循环中还是在循环之外?如果它超出了 while 循环,我是否写 while q.not_empty
?当文档明确提到使用 join()
时,我为什么要使用 q.not_empty
?
这是我的代码。我期待我的 4 个核心,同时 return 我的函数计算的素数数量,每个核心 2 次,总共 8 次计算。基本计算功能可以正常工作。
import queue
def main():
q = queue.Queue()
[q.put((compute_primes, (1, 30000))) for _ in range(8)]
with multiprocessing.Pool(processes=4) as pool:
while q.not_empty:
result = q.get()
function = pool.apply_async(result[0], args=(result[1][0], result[1][1]))
function.get()
q.join()
使用上面的代码,如果队列为空,我就跳出循环。但这应该是不真实的,为什么我之后需要 q.join()
?
使用下面的代码,我无法跳出循环。变化是 while True
和 q.join()
的位置
def main():
q = queue.Queue()
[q.put((compute_primes, (1, 30000))) for _ in range(8)]
with multiprocessing.Pool(processes=4) as pool:
while True:
result = q.get()
function = pool.apply_async(result[0], args=(result[1][0], result[1][1]))
function.get()
q.join()
我应该把q.join
放在哪里?
P.S。这段代码也没有有效地并行化任务,它本质上是一个一个地计算函数,我不明白为什么,但这是一个不同的问题。
P.S。 2
质数函数代码
def compute_primes(start, end):
start_time = time.time()
primes = []
for number in range(start, end + 1):
flag = True
for i in range(2, number):
if (number % i) == 0:
flag = False
break
if flag:
primes.append(number)
end_time = time.time()
print(f"Time taken: {end_time - start_time}\n"
f"Amount primes: {len(primes)}")
return primes
队列和池
Running one at a time... separate problem.
实际上,这是同一个问题的一部分。这一切都意味着你 不是
使用由 Pool
管理的多处理池。你现在要做的是
把你所有的任务放在一个队列中,让他们再次直接退出,然后
使用池一次处理一个,一次只能处理一个任务
时间。这两种范例是相互排斥的:如果你想使用一个池来
为您完成工作,您不需要排队;如果你需要处理队列
你自己,你可能不想使用 pool
.
池
multiprocessing.Pool
和伴随的方法产生正确数量的工人
进程,将你的函数序列化给它们,然后在内部设置一个队列
并处理发送任务和获取结果。这比做起来容易得多
手动操作,这通常是正确的处理方式:
当你使用池时,你会做这样的事情:
results = pool.map(compute_primes, [(0,100_000) for _ in range(8)])
这会为您阻塞,直到所有池都完成,或者:
results = pool.map_async(compute_primes, [(0, 100_000) for _ in range(8)])
results.wait() # wait
除非您计划在结果出现时对其进行处理,在这种情况下您不需要
完全使用 results.wait()
:
for _ in range(8):
result = results.get()
do_stuff(result)
您使用 pool.join()
或 pool.close()
只是为了确保 池 已关闭
优雅地下降,但这与获得结果无关。
你的例子
您的第一个示例有效,因为您这样做了:
- 将任务放入队列
- 一张一张拿出来加工
- 加入一个空队列 -> 立即离开
你的第二个例子失败了,因为你这样做了:
- 将任务放入队列
- 完成一项任务
- 等待队列为空或完成 -> 无限期阻塞
在这种情况下,您根本不需要队列。
手动使用队列
旁白:你从哪里得到 Queue
? multiprocessing.Queue
不是
可加入;你需要 multiprocessing.JoinableQueue
。 threading.Queue
应该
不能与 multiprocessing
一起使用。 queue.Queue
,同样,不应该使用
使用`多处理。
你什么时候使用任务队列?当你不想只是想应用一堆
一堆函数的参数。也许您想使用自定义 class。
也许您想做一些有趣的事情。也许你想做一些事情
一种类型的论证,但如果论证属于某种类型,则为其他类型,
这样可以更好地组织代码。在这些情况下,subclassing Process
(或
Thread
for multithreading) 你自己可能会更清晰。 None 个
似乎适用于这种情况。
对队列使用join
.join()
用于 task 队列。它会阻塞,直到队列中的每个任务都有
被标记为完成。当你想卸载一些处理时这很方便
到一堆进程,但在你做任何事情之前等待它们。那么你
通常做这样的事情:
tasks = JoinableQueue()
for t in qs:
tasks.put(t)
start_multiprocessing() # dummy fn
tasks.join() # wait for everything to be done
但是在这种情况下,您不这样做,或者不想这样做。
我不希望为 Pool 构造函数指定参数,除非出于某种原因我需要很少的并发进程。通过不带参数构造 Pool ,潜在的并发进程数将因计算机而异,具体取决于其 CPU 体系结构。以下是我将如何执行您的任务(假设我完全理解您的用例):
from multiprocessing import Pool
def genPrime(): # prime number generator
D = {}
q = 2
while True:
if q not in D:
yield q
D[q * q] = [q]
else:
for p in D[q]:
D.setdefault(p + q, []).append(p)
del D[q]
q += 1
def compute_primes(n):
g = genPrime()
return [next(g) for _ in range(n)]
NCOMPUTATIONS = 8
NPRIMES = 30_000
def main():
with Pool() as pool:
ar = []
for _ in range(NCOMPUTATIONS):
ar.append(pool.apply_async(compute_primes, [NPRIMES]))
for _ar in ar:
result = _ar.get() # waits for process to terminate and get its return value
assert len(result) == NPRIMES
if __name__ == '__main__':
main()
[请注意我不是genPrime函数的作者]
我正在尝试学习 Python 中的多处理库,但我无法让我的代码与 queue.Queue
一起工作。简而言之,我不知道将 queue.Queue.join()
方法放在我的代码中的什么位置。它是在 while 循环中还是在循环之外?如果它超出了 while 循环,我是否写 while q.not_empty
?当文档明确提到使用 join()
时,我为什么要使用 q.not_empty
?
这是我的代码。我期待我的 4 个核心,同时 return 我的函数计算的素数数量,每个核心 2 次,总共 8 次计算。基本计算功能可以正常工作。
import queue
def main():
q = queue.Queue()
[q.put((compute_primes, (1, 30000))) for _ in range(8)]
with multiprocessing.Pool(processes=4) as pool:
while q.not_empty:
result = q.get()
function = pool.apply_async(result[0], args=(result[1][0], result[1][1]))
function.get()
q.join()
使用上面的代码,如果队列为空,我就跳出循环。但这应该是不真实的,为什么我之后需要 q.join()
?
使用下面的代码,我无法跳出循环。变化是 while True
和 q.join()
def main():
q = queue.Queue()
[q.put((compute_primes, (1, 30000))) for _ in range(8)]
with multiprocessing.Pool(processes=4) as pool:
while True:
result = q.get()
function = pool.apply_async(result[0], args=(result[1][0], result[1][1]))
function.get()
q.join()
我应该把q.join
放在哪里?
P.S。这段代码也没有有效地并行化任务,它本质上是一个一个地计算函数,我不明白为什么,但这是一个不同的问题。
P.S。 2
质数函数代码
def compute_primes(start, end):
start_time = time.time()
primes = []
for number in range(start, end + 1):
flag = True
for i in range(2, number):
if (number % i) == 0:
flag = False
break
if flag:
primes.append(number)
end_time = time.time()
print(f"Time taken: {end_time - start_time}\n"
f"Amount primes: {len(primes)}")
return primes
队列和池
Running one at a time... separate problem.
实际上,这是同一个问题的一部分。这一切都意味着你 不是
使用由 Pool
管理的多处理池。你现在要做的是
把你所有的任务放在一个队列中,让他们再次直接退出,然后
使用池一次处理一个,一次只能处理一个任务
时间。这两种范例是相互排斥的:如果你想使用一个池来
为您完成工作,您不需要排队;如果你需要处理队列
你自己,你可能不想使用 pool
.
池
multiprocessing.Pool
和伴随的方法产生正确数量的工人
进程,将你的函数序列化给它们,然后在内部设置一个队列
并处理发送任务和获取结果。这比做起来容易得多
手动操作,这通常是正确的处理方式:
当你使用池时,你会做这样的事情:
results = pool.map(compute_primes, [(0,100_000) for _ in range(8)])
这会为您阻塞,直到所有池都完成,或者:
results = pool.map_async(compute_primes, [(0, 100_000) for _ in range(8)])
results.wait() # wait
除非您计划在结果出现时对其进行处理,在这种情况下您不需要
完全使用 results.wait()
:
for _ in range(8):
result = results.get()
do_stuff(result)
您使用 pool.join()
或 pool.close()
只是为了确保 池 已关闭
优雅地下降,但这与获得结果无关。
你的例子
您的第一个示例有效,因为您这样做了:
- 将任务放入队列
- 一张一张拿出来加工
- 加入一个空队列 -> 立即离开
你的第二个例子失败了,因为你这样做了:
- 将任务放入队列
- 完成一项任务
- 等待队列为空或完成 -> 无限期阻塞
在这种情况下,您根本不需要队列。
手动使用队列
旁白:你从哪里得到 Queue
? multiprocessing.Queue
不是
可加入;你需要 multiprocessing.JoinableQueue
。 threading.Queue
应该
不能与 multiprocessing
一起使用。 queue.Queue
,同样,不应该使用
使用`多处理。
你什么时候使用任务队列?当你不想只是想应用一堆
一堆函数的参数。也许您想使用自定义 class。
也许您想做一些有趣的事情。也许你想做一些事情
一种类型的论证,但如果论证属于某种类型,则为其他类型,
这样可以更好地组织代码。在这些情况下,subclassing Process
(或
Thread
for multithreading) 你自己可能会更清晰。 None 个
似乎适用于这种情况。
对队列使用join
.join()
用于 task 队列。它会阻塞,直到队列中的每个任务都有
被标记为完成。当你想卸载一些处理时这很方便
到一堆进程,但在你做任何事情之前等待它们。那么你
通常做这样的事情:
tasks = JoinableQueue()
for t in qs:
tasks.put(t)
start_multiprocessing() # dummy fn
tasks.join() # wait for everything to be done
但是在这种情况下,您不这样做,或者不想这样做。
我不希望为 Pool 构造函数指定参数,除非出于某种原因我需要很少的并发进程。通过不带参数构造 Pool ,潜在的并发进程数将因计算机而异,具体取决于其 CPU 体系结构。以下是我将如何执行您的任务(假设我完全理解您的用例):
from multiprocessing import Pool
def genPrime(): # prime number generator
D = {}
q = 2
while True:
if q not in D:
yield q
D[q * q] = [q]
else:
for p in D[q]:
D.setdefault(p + q, []).append(p)
del D[q]
q += 1
def compute_primes(n):
g = genPrime()
return [next(g) for _ in range(n)]
NCOMPUTATIONS = 8
NPRIMES = 30_000
def main():
with Pool() as pool:
ar = []
for _ in range(NCOMPUTATIONS):
ar.append(pool.apply_async(compute_primes, [NPRIMES]))
for _ar in ar:
result = _ar.get() # waits for process to terminate and get its return value
assert len(result) == NPRIMES
if __name__ == '__main__':
main()
[请注意我不是genPrime函数的作者]