Python - 多处理的奇怪行为 - 连接不执行
Python - weird behavior with multiprocessing - join does not execute
我正在使用 multiprocessing
python 模块。我同时有大约 20-25 个任务要 运行。每个任务将创建一个约 20k 行的 pandas.DataFrame
对象。问题是,所有任务都执行得很好,但是当涉及到 "joining" 个进程时,它就停止了。我试过 "small" DataFrames 并且效果很好。为了说明我的观点,我创建了以下代码。
import pandas
import multiprocessing as mp
def task(arg, queue):
DF = pandas.DataFrame({"hello":range(10)}) # try range(1000) or range(10000)
queue.put(DF)
print("DF %d stored" %arg)
listArgs = range(20)
queue = mp.Queue()
processes = [mp.Process(target=task,args=(arg,queue)) for arg in listArgs]
for p in processes:
p.start()
for i,p in enumerate(processes):
print("joining %d" %i)
p.join()
results = [queue.get() for p in processes]
编辑:
使用 DF = pandas.DataFrame({"hello":range(10)}) 我一切都正确:"DF 0 stored" 直到 "DF 19 stored",同"joining 0"至"joining 19".
然而 DF = pandas.DataFrame({"hello":range(1000)}) 问题出现了:当它存储 DF 时,加入步骤在 "joining 3".
后停止
感谢您提供有用的提示:)
这个问题在文档中有解释,在 Pipes and Queues:
Warning: As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread
), then that process will not terminate until all buffered items have been flushed to the pipe.
This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.
Note that a queue created using a manager does not have this issue. See Programming guidelines.
使用管理器会奏效,但有很多更简单的方法可以解决这个问题:
- 先从队列中读取数据,然后加入进程,而不是相反。
- 手动管理
Queue
(例如,使用 JoinableQueue
和 task_done
)。
- 只需使用
Pool.map
,而不是重新发明轮子。 (是的,Pool
所做的很多事情对于您的用例来说并不是必需的——但它也不会妨碍您,而且好消息是,您已经知道它有效。)
我不会展示 #1 的实现,因为它太琐碎了,或者 #2 的实现,因为它太痛苦了,但是 #3:
def task(arg):
DF = pandas.DataFrame({"hello":range(1000)}) # try range(1000) or range(10000)
return DF
with mp.Pool(processes=20) as p:
results = p.map(task, range(20), chunksize=1)
(在 2.7 中,Pool
可能无法在 with
语句中工作;您可以将更高版本的 multiprocessing
的端口安装回 PyPI 的 2.7,或者您可以只需手动创建池,然后 close
它在 try
/finally
中,如果它在 with
语句中不起作用,您将处理一个文件... )
您可能会问自己,为什么此时它会失败,但使用更小的数字——甚至只是小一点点?
那个DataFrame的pickle刚好超过16K。 (这个列表本身有点小,但如果你用 10000 而不是 1000 来尝试,你应该会看到没有 Pandas 的相同结果。)
因此,第一个 child 写入 16K,然后阻塞,直到有空间写入最后几百个字节。但是在 join
之前,你不会从管道中取出任何东西(通过调用 queue.get
),并且在他们退出之前你不能 join
,而他们在你退出之前不能这样做疏通管道,所以这是一个典型的僵局。有足够的空间让前 4 个通过,但没有空间让 5 个通过。因为你有 4 个核心,所以大多数时候,前 4 个通过的将是前 4 个。但偶尔#4 会击败#3 或其他什么,然后你将无法加入#3。如果是 8 核机器,这种情况会更频繁地发生。
我正在使用 multiprocessing
python 模块。我同时有大约 20-25 个任务要 运行。每个任务将创建一个约 20k 行的 pandas.DataFrame
对象。问题是,所有任务都执行得很好,但是当涉及到 "joining" 个进程时,它就停止了。我试过 "small" DataFrames 并且效果很好。为了说明我的观点,我创建了以下代码。
import pandas
import multiprocessing as mp
def task(arg, queue):
DF = pandas.DataFrame({"hello":range(10)}) # try range(1000) or range(10000)
queue.put(DF)
print("DF %d stored" %arg)
listArgs = range(20)
queue = mp.Queue()
processes = [mp.Process(target=task,args=(arg,queue)) for arg in listArgs]
for p in processes:
p.start()
for i,p in enumerate(processes):
print("joining %d" %i)
p.join()
results = [queue.get() for p in processes]
编辑:
使用 DF = pandas.DataFrame({"hello":range(10)}) 我一切都正确:"DF 0 stored" 直到 "DF 19 stored",同"joining 0"至"joining 19".
然而 DF = pandas.DataFrame({"hello":range(1000)}) 问题出现了:当它存储 DF 时,加入步骤在 "joining 3".
后停止感谢您提供有用的提示:)
这个问题在文档中有解释,在 Pipes and Queues:
Warning: As mentioned above, if a child process has put items on a queue (and it has not used
JoinableQueue.cancel_join_thread
), then that process will not terminate until all buffered items have been flushed to the pipe.This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.
Note that a queue created using a manager does not have this issue. See Programming guidelines.
使用管理器会奏效,但有很多更简单的方法可以解决这个问题:
- 先从队列中读取数据,然后加入进程,而不是相反。
- 手动管理
Queue
(例如,使用JoinableQueue
和task_done
)。 - 只需使用
Pool.map
,而不是重新发明轮子。 (是的,Pool
所做的很多事情对于您的用例来说并不是必需的——但它也不会妨碍您,而且好消息是,您已经知道它有效。)
我不会展示 #1 的实现,因为它太琐碎了,或者 #2 的实现,因为它太痛苦了,但是 #3:
def task(arg):
DF = pandas.DataFrame({"hello":range(1000)}) # try range(1000) or range(10000)
return DF
with mp.Pool(processes=20) as p:
results = p.map(task, range(20), chunksize=1)
(在 2.7 中,Pool
可能无法在 with
语句中工作;您可以将更高版本的 multiprocessing
的端口安装回 PyPI 的 2.7,或者您可以只需手动创建池,然后 close
它在 try
/finally
中,如果它在 with
语句中不起作用,您将处理一个文件... )
您可能会问自己,为什么此时它会失败,但使用更小的数字——甚至只是小一点点?
那个DataFrame的pickle刚好超过16K。 (这个列表本身有点小,但如果你用 10000 而不是 1000 来尝试,你应该会看到没有 Pandas 的相同结果。)
因此,第一个 child 写入 16K,然后阻塞,直到有空间写入最后几百个字节。但是在 join
之前,你不会从管道中取出任何东西(通过调用 queue.get
),并且在他们退出之前你不能 join
,而他们在你退出之前不能这样做疏通管道,所以这是一个典型的僵局。有足够的空间让前 4 个通过,但没有空间让 5 个通过。因为你有 4 个核心,所以大多数时候,前 4 个通过的将是前 4 个。但偶尔#4 会击败#3 或其他什么,然后你将无法加入#3。如果是 8 核机器,这种情况会更频繁地发生。