引入多处理队列时执行时间增加
Increase in execution time when introducing a multiprocessing queue
我正在尝试使用 Python 的多处理包,特别是使用 Process 函数来测量我拥有 "parallelized" 的一段代码。
我有两个函数想要并行 运行:function1
和 function2
。 function1
不是 return 值,而 function2
是。 function2 的 return 值是一个相当大的 class 实例。
这是我使用队列并行化和获取 return 值的现有代码:
import multiprocessing as mpc
...
def Wrapper(self,...):
jobs = []
q = mpc.Queue()
p1 = mpc.Process(target=self.function1,args=(timestep,))
jobs.append(p1)
p2 = mpc.Process(target=self.function2,args=(timestep,arg1,arg2,arg3,...,q))
jobs.append(p2)
for j in jobs:
j.start()
result = q.get()
for j in jobs:
j.join()
所以,这就是我遇到的问题。如果我删除对 result = q.get()
的调用,执行 Wrapper 函数所需的时间会显着减少,因为它不是 return 从 function2
调用 class,但是我显然不要从函数中获取我需要的数据。如果我把它放回去,运行 时间会显着增加,从而表明并行化实际上比顺序执行这两个函数花费的时间更长。
以下是 Wrapper 的一些平均执行时间,供参考:
顺序码(即function1(timestep)
、res = function2(timestep,a1,a2,a3,...,None)
):10秒
不使用队列的并行代码:8 秒
带有队列的并行代码:60 秒
我使用此代码的目的是展示并行化一段代码如何缩短执行不必要的并行函数所需的时间。作为参考,我正在使用 cProfile 包,生成我的代码的配置文件,并查看包装器 运行.
所需的时间
我开始对整个过程感到沮丧。它旨在基本上加快我添加到内部开发的现有自定义框架中的部分程序的速度,但是我无法实际证明我没有增加太多开销。
如果我查看程序的整体执行时间,并行代码 运行 会快得多。然而,当我深入挖掘时,我的并行化代码开始花费更长的时间。
现在,我的想法是队列正在执行某种深度复制操作,但是我找不到说明该事实的引用,所以我假设它是 return 浅复制,对我来说,不需要这样的开销。
当您将对象传递给 multiprocessing.Queue
时,需要在 put
端对其进行 pickle,然后 pickle 的字节必须被刷新到管道中。在 get
端,需要从管道中读取 pickle 字节,然后将它们解压回 Python 对象。所以实际上,multiprocessing.Queue
做的事情比深拷贝还要慢。
您看到的开销几乎可以肯定是解封大型对象所需开销的结果。这是 Python 真正困难的并行编程领域 - 如果您正在执行 CPU 绑定操作(因此不能使用线程来获得并行性)并且需要共享状态,那么您将支付性能损失。如果您共享大型对象,惩罚也可能很大。 Python 中的并行性是通过并行化一些 CPU 绑定操作获得的性能提升与必须共享的性能 惩罚 之间的权衡进程之间的状态。因此,您的目标需要是最小化共享状态的数量,并最大化并行化的工作量。
不幸的是,一旦你这样做了,你进一步减轻性能影响的选择就有些受限了。您可以尝试将 class 转换为 ctypes
对象,这样您就可以使用 multiprocessing.sharedctypes
在共享内存中创建对象。这应该比通过 Queue
返回对象更快,但是您必须处理 ctypes
.
的所有限制
另一个想法是在 multiprocessing.Manager
server. If you do this, your actual object will live in a server process, and both your parent and child process will access the object via a Proxy
中创建您的对象。但是,这会使对象的每个 read/write 变慢,因此最终它可能不会比您现在的 Queue
实现更好。
这些替代方案都不是很好,并且可能都不适用于您的用例,在这种情况下,Python 可能不是解决此特定问题的最佳语言。不要误会我的意思;我喜欢 Python 并尽可能使用它,但这是一个真正困难的领域。
我正在尝试使用 Python 的多处理包,特别是使用 Process 函数来测量我拥有 "parallelized" 的一段代码。
我有两个函数想要并行 运行:function1
和 function2
。 function1
不是 return 值,而 function2
是。 function2 的 return 值是一个相当大的 class 实例。
这是我使用队列并行化和获取 return 值的现有代码:
import multiprocessing as mpc
...
def Wrapper(self,...):
jobs = []
q = mpc.Queue()
p1 = mpc.Process(target=self.function1,args=(timestep,))
jobs.append(p1)
p2 = mpc.Process(target=self.function2,args=(timestep,arg1,arg2,arg3,...,q))
jobs.append(p2)
for j in jobs:
j.start()
result = q.get()
for j in jobs:
j.join()
所以,这就是我遇到的问题。如果我删除对 result = q.get()
的调用,执行 Wrapper 函数所需的时间会显着减少,因为它不是 return 从 function2
调用 class,但是我显然不要从函数中获取我需要的数据。如果我把它放回去,运行 时间会显着增加,从而表明并行化实际上比顺序执行这两个函数花费的时间更长。
以下是 Wrapper 的一些平均执行时间,供参考:
顺序码(即
function1(timestep)
、res = function2(timestep,a1,a2,a3,...,None)
):10秒不使用队列的并行代码:8 秒
带有队列的并行代码:60 秒
我使用此代码的目的是展示并行化一段代码如何缩短执行不必要的并行函数所需的时间。作为参考,我正在使用 cProfile 包,生成我的代码的配置文件,并查看包装器 运行.
所需的时间我开始对整个过程感到沮丧。它旨在基本上加快我添加到内部开发的现有自定义框架中的部分程序的速度,但是我无法实际证明我没有增加太多开销。
如果我查看程序的整体执行时间,并行代码 运行 会快得多。然而,当我深入挖掘时,我的并行化代码开始花费更长的时间。
现在,我的想法是队列正在执行某种深度复制操作,但是我找不到说明该事实的引用,所以我假设它是 return 浅复制,对我来说,不需要这样的开销。
当您将对象传递给 multiprocessing.Queue
时,需要在 put
端对其进行 pickle,然后 pickle 的字节必须被刷新到管道中。在 get
端,需要从管道中读取 pickle 字节,然后将它们解压回 Python 对象。所以实际上,multiprocessing.Queue
做的事情比深拷贝还要慢。
您看到的开销几乎可以肯定是解封大型对象所需开销的结果。这是 Python 真正困难的并行编程领域 - 如果您正在执行 CPU 绑定操作(因此不能使用线程来获得并行性)并且需要共享状态,那么您将支付性能损失。如果您共享大型对象,惩罚也可能很大。 Python 中的并行性是通过并行化一些 CPU 绑定操作获得的性能提升与必须共享的性能 惩罚 之间的权衡进程之间的状态。因此,您的目标需要是最小化共享状态的数量,并最大化并行化的工作量。
不幸的是,一旦你这样做了,你进一步减轻性能影响的选择就有些受限了。您可以尝试将 class 转换为 ctypes
对象,这样您就可以使用 multiprocessing.sharedctypes
在共享内存中创建对象。这应该比通过 Queue
返回对象更快,但是您必须处理 ctypes
.
另一个想法是在 multiprocessing.Manager
server. If you do this, your actual object will live in a server process, and both your parent and child process will access the object via a Proxy
中创建您的对象。但是,这会使对象的每个 read/write 变慢,因此最终它可能不会比您现在的 Queue
实现更好。
这些替代方案都不是很好,并且可能都不适用于您的用例,在这种情况下,Python 可能不是解决此特定问题的最佳语言。不要误会我的意思;我喜欢 Python 并尽可能使用它,但这是一个真正困难的领域。