池、队列、挂起

Pool, queue, hang

我想使用队列来保存结果,因为我希望消费者(串行而非并行)在工作人员生成结果时处理工作人员的结果。

现在,我想知道为什么以下程序挂起。

import multiprocessing as mp
import time
import numpy as np
def worker(arg):
    time.sleep(0.2)
    q, arr = arg 
    q.put(arr[0])

p = mp.Pool(4)
x = np.array([4,4])
q = mp.Queue()

for i in range(4):
    x[0] = i 
    #worker((q,x))
    p.apply_async(worker, args=((q, x),)) 

print("done_apply")
time.sleep(0.2)
for i in range(4):
    print(q.get())

更改 apply_async 以应用给出错误消息:

"Queue objects should only be shared between processes through inheritance"

解决方案:

import multiprocessing as mp
import time
import numpy as np
def worker(arg):
    time.sleep(0.2)
    q, arr = arg
    q.put(arr[0])

p = mp.Pool(4)
x = np.array([4,4])
m = mp.Manager()
q = m.Queue()

for i in range(4):
    x[0] = i
    #worker((q,x))
    p.apply_async(worker, args=((q, x),))

print("done_apply")
time.sleep(0.2)
for i in range(4):
    print(q.get())

结果:

done_apply
3
3
3
3

显然,我需要手动复制 numpy 数组,因为所需的结果应该是 0、1、2、3,而不是 3、3、3、3。

Queue 个对象无法共享。我首先通过找到这个 answer.

得出了与 OP 相同的结论

不幸的是,此代码中还有其他问题(这并不能使其与链接的答案完全相同)

  • worker(arg) 应该是 worker(*arg) args 解包工作。没有它,我的进程也被锁定了(我承认我不知道为什么。它应该抛出一个异常,但我想多处理和异常不能很好地协同工作)
  • 将相同的 x 传递给工作人员会得到相同的结果(apply 有效,但 apply_async

另一件事:为了代码的可移植性,将主要代码包装在 if __name__ == "__main__": 上,因为进程产生的差异 Windows 需要

为我输出 0,3,2,1 的完全固定代码:

import multiprocessing as mp
import time
import numpy as np
def worker(*arg):  # there are 2 arguments to "worker"
#def worker(q, arr):  # is probably even better
    time.sleep(0.2)
    q, arr = arg
    q.put(arr[0])

if __name__ == "__main__":
    p = mp.Pool(4)

    m = mp.Manager()  # use a manager, Queue objects cannot be shared
    q = m.Queue()

    for i in range(4):
        x = np.array([4,4])  # create array each time (or make a copy)
        x[0] = i
        p.apply_async(worker, args=(q, x))

    print("done_apply")
    time.sleep(0.2)
    for i in range(4):
        print(q.get())

我认为您选择将 multiprocessing.Pool 与您自己的 queue 一起使用是您遇到的主要问题的根源。使用池预先创建子进程,稍后将作业分配给这些子进程。但是由于您不能(轻松地)将 queue 传递给已经存在的进程,因此这不是您的问题的理想匹配。

相反,您应该摆脱自己的队列并使用池中内置的队列来获取由 worker 编辑的值 return,或者完全废弃池并使用 multiprocessing.Process 为您必须完成的每个任务启动一个新流程。

我还注意到您的代码在修改 x 数组的主线程和在将旧值发送到工作进程之前序列化旧值的线程之间的主进程中存在竞争条件。大多数时候,您可能最终会发送同一数组的许多副本(具有最终值),而不是您想要的几个不同的值。

这是一个快速且未经测试的版本,它可以删除队列:

def worker(arr):
    time.sleep(0.2)
    return arr[0]

if __name__ == "__main__":
    p = mp.Pool(4)
    results = p.map(worker, [np.array([i, 4]) for i in range(4)])
    p.join()
    for result in results:
        print(result)

这是一个删除 Pool 并保留队列的版本:

def worker(q, arr): 
    time.sleep(0.2)
    q.put(arr[0])

if __name__ == "__main__":
    q = m.Queue()
    processes = []

    for i in range(4):
        p = mp.Process(target=worker, args=(q, np.array([i, 4])))
        p.start()
        processes.append(p)

    for i in range(4):
        print(q.get())

    for p in processes:
        p.join()

请注意,在上一个版本中,在我们尝试 join 进程之前,我们 get 来自队列的结果可能很重要(尽管如果我们只处理四个进程可能不是值)。如果队列要填满,如果我们按其他顺序执行,则可能会发生死锁。工作进程可能在尝试写入队列时被阻塞,而主进程在等待工作进程退出时被阻塞。