Python - 多处理的奇怪行为 - 连接不执行

Python - weird behavior with multiprocessing - join does not execute

我正在使用 multiprocessing python 模块。我同时有大约 20-25 个任务要 运行。每个任务将创建一个约 20k 行的 pandas.DataFrame 对象。问题是,所有任务都执行得很好,但是当涉及到 "joining" 个进程时,它就停止了。我试过 "small" DataFrames 并且效果很好。为了说明我的观点,我创建了以下代码。

import pandas
import multiprocessing as mp

def task(arg, queue):
    DF = pandas.DataFrame({"hello":range(10)}) # try range(1000) or range(10000)
    queue.put(DF)
    print("DF %d stored" %arg)

listArgs = range(20)
queue = mp.Queue()
processes = [mp.Process(target=task,args=(arg,queue)) for arg in listArgs]

for p in processes:
    p.start()

for i,p in enumerate(processes):
    print("joining %d" %i)
    p.join()

results = [queue.get() for p in processes]

编辑:

使用 DF = pandas.DataFrame({"hello":range(10)}) 我一切都正确:"DF 0 stored" 直到 "DF 19 stored",同"joining 0"至"joining 19".

然而 DF = pandas.DataFrame({"hello":range(1000)}) 问题出现了:当它存储 DF 时,加入步骤在 "joining 3".

后停止

感谢您提供有用的提示:)

这个问题在文档中有解释,在 Pipes and Queues:

Warning: As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.

This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

Note that a queue created using a manager does not have this issue. See Programming guidelines.

使用管理器会奏效,但有很多更简单的方法可以解决这个问题:

  1. 先从队列中读取数据,然后加入进程,而不是相反。
  2. 手动管理 Queue(例如,使用 JoinableQueuetask_done)。
  3. 只需使用Pool.map,而不是重新发明轮子。 (是的,Pool 所做的很多事情对于您的用例来说并不是必需的——但它也不会妨碍您,而且好消息是,您已经知道它有效。)

我不会展示 #1 的实现,因为它太琐碎了,或者 #2 的实现,因为它太痛苦了,但是 #3:

def task(arg):
    DF = pandas.DataFrame({"hello":range(1000)}) # try range(1000) or range(10000)
    return DF

with mp.Pool(processes=20) as p:
    results = p.map(task, range(20), chunksize=1)

(在 2.7 中,Pool 可能无法在 with 语句中工作;您可以将更高版本的 multiprocessing 的端口安装回 PyPI 的 2.7,或者您可以只需手动创建池,然后 close 它在 try/finally 中,如果它在 with 语句中不起作用,您将处理一个文件... )


您可能会问自己,为什么此时它会失败,但使用更小的数字——甚至只是小一点点?

那个DataFrame的pickle刚好超过16K。 (这个列表本身有点小,但如果你用 10000 而不是 1000 来尝试,你应该会看到没有 Pandas 的相同结果。)

因此,第一个 child 写入 16K,然后阻塞,直到有空间写入最后几百个字节。但是在 join 之前,你不会从管道中取出任何东西(通过调用 queue.get),并且在他们退出之前你不能 join,而他们在你退出之前不能这样做疏通管道,所以这是一个典型的僵局。有足够的空间让前 4 个通过,但没有空间让 5 个通过。因为你有 4 个核心,所以大多数时候,前 4 个通过的将是前 4 个。但偶尔#4 会击败#3 或其他什么,然后你将无法加入#3。如果是 8 核机器,这种情况会更频繁地发生。