如何成功利用 Queue.join() 进行多处理?

How to successfully utilize Queue.join() with multiprocessing?

我正在尝试学习 Python 中的多处理库,但我无法让我的代码与 queue.Queue 一起工作。简而言之,我不知道将 queue.Queue.join() 方法放在我的代码中的什么位置。它是在 while 循环中还是在循环之外?如果它超出了 while 循环,我是否写 while q.not_empty?当文档明确提到使用 join() 时,我为什么要使用 q.not_empty

这是我的代码。我期待我的 4 个核心,同时 return 我的函数计算的素数数量,每个核心 2 次,总共 8 次计算。基本计算功能可以正常工作。

import queue
def main():
q = queue.Queue()
[q.put((compute_primes, (1, 30000))) for _ in range(8)]
with multiprocessing.Pool(processes=4) as pool:
    while q.not_empty:
        result = q.get()
        function = pool.apply_async(result[0], args=(result[1][0], result[1][1]))
        function.get()
    q.join()

使用上面的代码,如果队列为空,我就跳出循环。但这应该是不真实的,为什么我之后需要 q.join()

使用下面的代码,我无法跳出循环。变化是 while Trueq.join()

的位置
def main():
q = queue.Queue()
[q.put((compute_primes, (1, 30000))) for _ in range(8)]
with multiprocessing.Pool(processes=4) as pool:
    while True:
        result = q.get()
        function = pool.apply_async(result[0], args=(result[1][0], result[1][1]))
        function.get()
        q.join()

我应该把q.join放在哪里?

P.S。这段代码也没有有效地并行化任务,它本质上是一个一个地计算函数,我不明白为什么,但这是一个不同的问题。

P.S。 2

质数函数代码

def compute_primes(start, end):
start_time = time.time()
primes = []
for number in range(start, end + 1):
    flag = True
    for i in range(2, number):
        if (number % i) == 0:
            flag = False
            break
    if flag:
        primes.append(number)
end_time = time.time()
print(f"Time taken: {end_time - start_time}\n"
      f"Amount primes: {len(primes)}")
return primes

队列和池

Running one at a time... separate problem.

实际上,这是同一个问题的一部分。这一切都意味着你 不是 使用由 Pool 管理的多处理池。你现在要做的是 把你所有的任务放在一个队列中,让他们再次直接退出,然后 使用池一次处理一个,一次只能处理一个任务 时间。这两种范例是相互排斥的:如果你想使用一个池来 为您完成工作,您不需要排队;如果你需要处理队列 你自己,你可能不想使用 pool.

multiprocessing.Pool 和伴随的方法产生正确数量的工人 进程,将你的函数序列化给它们,然后在内部设置一个队列 并处理发送任务和获取结果。这比做起来容易得多 手动操作,这通常是正确的处理方式:

当你使用池时,你会做这样的事情:

results = pool.map(compute_primes, [(0,100_000) for _ in range(8)])

这会为您阻塞,直到所有池都完成,或者:

results = pool.map_async(compute_primes, [(0, 100_000) for _ in range(8)])
results.wait() # wait

除非您计划在结果出现时对其进行处理,在这种情况下您不需要 完全使用 results.wait()

for _ in range(8):
    result = results.get()
    do_stuff(result)

您使用 pool.join()pool.close() 只是为了确保 已关闭 优雅地下降,但这与获得结果无关。

你的例子

您的第一个示例有效,因为您这样做了:

  • 将任务放入队列
  • 一张一张拿出来加工
  • 加入一个空队列 -> 立即离开

你的第二个例子失败了,因为你这样做了:

  • 将任务放入队列
  • 完成一项任务
  • 等待队列为空或完成 -> 无限期阻塞

在这种情况下,您根本不需要队列。

手动使用队列

旁白:你从哪里得到 Queuemultiprocessing.Queue 不是 可加入;你需要 multiprocessing.JoinableQueuethreading.Queue应该 不能与 multiprocessing 一起使用。 queue.Queue,同样,不应该使用 使用`多处理。

你什么时候使用任务队列?当你不想只是想应用一堆 一堆函数的参数。也许您想使用自定义 class。 也许您想做一些有趣的事情。也许你想做一些事情 一种类型的论证,但如果论证属于某种类型,则为其他类型, 这样可以更好地组织代码。在这些情况下,subclassing Process(或 Thread for multithreading) 你自己可能会更清晰。 None 个 似乎适用于这种情况。

对队列使用join

.join() 用于 task 队列。它会阻塞,直到队列中的每个任务都有 被标记为完成。当你想卸载一些处理时这很方便 到一堆进程,但在你做任何事情之前等待它们。那么你 通常做这样的事情:

tasks = JoinableQueue()
for t in qs:
    tasks.put(t)
start_multiprocessing() # dummy fn
tasks.join() # wait for everything to be done

但是在这种情况下,您不这样做,或者不想这样做。

我不希望为 Pool 构造函数指定参数,除非出于某种原因我需要很少的并发进程。通过不带参数构造 Pool ,潜在的并发进程数将因计算机而异,具体取决于其 CPU 体系结构。以下是我将如何执行您的任务(假设我完全理解您的用例):

from multiprocessing import Pool


def genPrime():  # prime number generator
    D = {}
    q = 2
    while True:
        if q not in D:
            yield q
            D[q * q] = [q]
        else:
            for p in D[q]:
                D.setdefault(p + q, []).append(p)
            del D[q]
        q += 1


def compute_primes(n):
    g = genPrime()
    return [next(g) for _ in range(n)]


NCOMPUTATIONS = 8
NPRIMES = 30_000


def main():
    with Pool() as pool:
        ar = []
        for _ in range(NCOMPUTATIONS):
            ar.append(pool.apply_async(compute_primes, [NPRIMES]))
        for _ar in ar:
            result = _ar.get() # waits for process to terminate and get its return value
            assert len(result) == NPRIMES


if __name__ == '__main__':
    main()

[请注意我不是genPrime函数的作者]