多处理模块死锁

Deadlock with multiprocessing module

我有一个函数,无需多处理,它会循环遍历具有 3 元组的数组并进行一些计算。这个数组可能非常长(>100 万个条目)所以我认为使用多个进程可以帮助加快速度。

我从一个点列表 (random_points) 开始,用它创建所有可能的三元组 (combList) 的排列。这个 combList 然后被传递给我的函数。 我的基本代码有效,但仅当 random_points 列表有 18 个条目或更少时。

from scipy import stats
import itertools
import multiprocessing as mp

def calc3PointsList( points,output ):
  xy = []
  r = []
  for point in points:
    // do stuff with points and append results to xy and r
  output.put( (xy, r) )


output = mp.Queue()

random_points = [ (np.array((stats.uniform(-0.5,1).rvs(),stats.uniform(-0.5,1).rvs()))) for _ in range(18)]
combList = list(itertools.combinations(random_points, 3))
N = 6
processes = [mp.Process(target=calc3PointsList, args=(combList[(i-1)*len(combList)/(N-1):i*len(combList)/(N-1)],output)) for i in range(1,N)]

for p in processes:
  p.start()
for p in processes:
  p.join()
results = [output.get() for p in processes]

一旦 random_points 列表的长度超过 18,程序就会陷入死锁。 18 及以下它就完成得很好。我是不是用错了整个多处理模块?

我确实看到您发布的其他任何内容都明显错误,但您绝对应该做一件事:在 if __name__=="main": 块中启动新进程,请参阅 programming guideline

from scipy import stats
import itertools
import multiprocessing as mp

def calc3PointsList( points,output ):
  xy = []
  r = []
  for point in points:
    // do stuff with points and append results to xy and r
  output.put( (xy, r) )

if __name__ == "__main__":
    output = mp.Queue()
    random_points = [ (np.array((stats.uniform(-0.5,1).rvs(),stats.uniform(-0.5,1).rvs()))) for _ in range(18)]
    combList = list(itertools.combinations(random_points, 3))
    N = 6
    processes = [mp.Process(target=calc3PointsList, args=(combList[(i-1)*len(combList)/(N-1):i*len(combList)/(N-1)],output)) for i in range(1,N)]

    for p in processes:
        p.start()
    for p in processes:
        p.join()
    results = [output.get for x in range(output.qsize())]

好的,问题在user2667217提到的programming guideline中描述:

Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the Queue.cancel_join_thread method of the queue to avoid this behaviour.)

This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically.

删除连接操作使其工作。检索进程的正确方法似乎是:

results = [output.get() for p in processes]