multiprocessing python : 如何拆分作业
multiprocessing python : how to split jobs
我试图为多处理写一个python。一直在网上看,还是不懂怎么写。
我的脚本:
import multiprocessing as mp
import numpy as np
import ctypes
def func(M, j, :
coor_i = np.zeros(1000)
coor_j = np.ones(1000) # In reality it is loaded from a txt
A = np.square(coor_i - coor_j)
a = A.sum
M[j] = a
for i in range(1,100) :
M = mp.Array(ctypes.c_double, np.ones(i))
p = mp.Process(target = func, args = (coor_i, j)) for j in range(1,i)
p.start()
p.join()
print(M)
我在网上看,我看到了- 'mp.Pool', 'processes', 'mp.Queue'
非常感谢。
首先,您的第一个错误是没有从您的 func()
函数中 return 任何东西。在 python 中,所有值都是对内存中对象的引用,当您进行赋值时,您将用新对象替换引用值。所以做M = -M
不是改变M
的对象,而是创建一个新对象,并在函数范围内更改引用M
。
意思是你的函数会总是returnNone
:
>>> from multiprocessing import Pool
>>> def func(M): # if you call with M=1
... # M==1 here
... M = -M
... # M==-1 here
...
>>> M = 1
>>> M = func(M)
>>> print(M)
None
要解决此问题,您需要将其设为 return 值:
>>> def func(M):
... return -M
...
>>> print(func(1))
-1
然后,并行化工作的最佳方法是使用进程池,以便您可以控制并行的实例数 运行,直接适应 from the documentation examples:
>>> def func(M):
>>> return -M
...
>>> pool = Pool(processes=4) # start 4 worker processes
>>> results = []
>>> for i in range(1,100):
... results.append(pool.apply_async(func, [i])) # evaluate "func(i)" asynchronously
...
>>> print [result.get() for result in results]
[-1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -12, -13, -14, -15, -16, -17, -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28, -29, -30, -31, -32, -33, -34, -35, -36, -37, -38, -39, -40, -41, -42, -43, -44, -45, -46, -47, -48, -49, -50, -51, -52, -53, -54, -55, -56, -57, -58, -59, -60, -61, -62, -63, -64, -65, -66, -67, -68, -69, -70, -71, -72, -73, -74, -75, -76, -77, -78, -79, -80, -81, -82, -83, -84, -85, -86, -87, -88, -89, -90, -91, -92, -93, -94, -95, -96, -97, -98, -99]
做同样事情的另一种方法是使用:
>>> print(pool.map(func, range(1,100)))
[-1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -12, -13, -14, -15, -16, -17, -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28, -29, -30, -31, -32, -33, -34, -35, -36, -37, -38, -39, -40, -41, -42, -43, -44, -45, -46, -47, -48, -49, -50, -51, -52, -53, -54, -55, -56, -57, -58, -59, -60, -61, -62, -63, -64, -65, -66, -67, -68, -69, -70, -71, -72, -73, -74, -75, -76, -77, -78, -79, -80, -81, -82, -83, -84, -85, -86, -87, -88, -89, -90, -91, -92, -93, -94, -95, -96, -97, -98, -99]
话虽如此,如果您所做的一切确实是在否定价值(或一些简单的东西)——我猜你不是——那么最好 [=72=]NOT 使用并行化,因为 python 并且您的微处理器将对工作进行矢量化处理并使其 运行 更快:
>>> print([-M for M in range(1,100)])
[-1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -12, -13, -14, -15, -16, -17, -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28, -29, -30, -31, -32, -33, -34, -35, -36, -37, -38, -39, -40, -41, -42, -43, -44, -45, -46, -47, -48, -49, -50, -51, -52, -53, -54, -55, -56, -57, -58, -59, -60, -61, -62, -63, -64, -65, -66, -67, -68, -69, -70, -71, -72, -73, -74, -75, -76, -77, -78, -79, -80, -81, -82, -83, -84, -85, -86, -87, -88, -89, -90, -91, -92, -93, -94, -95, -96, -97, -98, -99]
这是一个指标,因为 运行 在我的机器上 python2:
>>> from timeit import timeit
>>> timeit("[-M for M in range(1,100)]", number=100000)
0.6327948570251465
>>> def test():
... pool.map(func, range(1,100))
...
>>> timeit(test, number=100000)
31.26303195953369
编辑:
你的问题的问题在于,你想要达到什么目的,以及你想要做什么是很不清楚的。通常并行化的第一个途径是做一个非并行化的版本,并尝试并行化那些会更好地工作并且没有互斥的东西。
但是在您的代码中让我印象深刻的一件事是,对于 range(1,100)
循环的每次迭代,您实际上是在等待一个进程完成,然后再启动一个新进程:
for i in range(1,100) :
M = mp.Array(ctypes.c_double, np.ones(i))
# Create process
p = mp.Process(target = func, args = (coor_i, j)) for j in range(1,i)
# Start a process
p.start()
# wait for the process p to finish before going on
p.join()
# will continue when p has finished
如果你想解决这个问题,你可以使用我示例中所示的 Pool
或者:
for i in range(1,100) :
M = mp.Array(ctypes.c_double, np.ones(i))
# Create process
p = mp.Process(target = func, args = (coor_i, j)) for j in range(1,i)
# Start a process
p.start()
# wait for the process p to finish before going on
p.join()
# will continue when p has finished
所以一个快速的改进是:
processes = [] # keep a list of all the processes
for i in range(1,100) :
M = mp.Array(ctypes.c_double, np.ones(i))
# Create process
for j in range(1,i):
p = mp.Process(target = func, args = (coor_i, j))
# Append to processes list
processes.append(p)
# Start a process
p.start()
# wait for all processes to have finished before quitting (or going on)
for p in processes:
p.join()
所以我刚刚更新了代码:在一个循环中启动所有进程,在另一个循环中阻塞直到它们完成。 .join()
使当前进程阻塞,直到另一个进程完成,因此第二个循环确保每个进程在完成您的代码段之前都已完成,或者继续执行进一步的代码。
因此对于 "generator" 问题,我没有注意到您在 mp.Process
行上执行了 for j in range(1, i)
,因此没有将进程附加到进程列表,而是列表生成器。
实际上你的conde 不能工作,编译时应该失败,因为:
p = Process(target = func, args = (coor_i, j)) for j in range(1 ,i)
^
SyntaxError: invalid syntax
如果您按以下方式更正:
>>> (p = Process(target = func, args = (coor_i, j)) for j in range(1 ,i))
那么你的原始代码 不能 工作,因为 p
不是一个进程,它是一个列表生成器并且 p.start()
不存在:
>>> p.next()
<Process(Process-5, initial)>
>>> p.next()
<Process(Process-6, initial)>
>>> p.next()
<Process(Process-7, initial)>
>>> p.next()
<Process(Process-8, initial)>
>>> p.next()
<Process(Process-9, initial)>
所以在这种情况下不使用 for one 衬里是一个坏主意,使用传统的 for 更正了这一点。
我试图为多处理写一个python。一直在网上看,还是不懂怎么写。
我的脚本:
import multiprocessing as mp
import numpy as np
import ctypes
def func(M, j, :
coor_i = np.zeros(1000)
coor_j = np.ones(1000) # In reality it is loaded from a txt
A = np.square(coor_i - coor_j)
a = A.sum
M[j] = a
for i in range(1,100) :
M = mp.Array(ctypes.c_double, np.ones(i))
p = mp.Process(target = func, args = (coor_i, j)) for j in range(1,i)
p.start()
p.join()
print(M)
我在网上看,我看到了- 'mp.Pool', 'processes', 'mp.Queue'
非常感谢。
首先,您的第一个错误是没有从您的 func()
函数中 return 任何东西。在 python 中,所有值都是对内存中对象的引用,当您进行赋值时,您将用新对象替换引用值。所以做M = -M
不是改变M
的对象,而是创建一个新对象,并在函数范围内更改引用M
。
意思是你的函数会总是returnNone
:
>>> from multiprocessing import Pool
>>> def func(M): # if you call with M=1
... # M==1 here
... M = -M
... # M==-1 here
...
>>> M = 1
>>> M = func(M)
>>> print(M)
None
要解决此问题,您需要将其设为 return 值:
>>> def func(M):
... return -M
...
>>> print(func(1))
-1
然后,并行化工作的最佳方法是使用进程池,以便您可以控制并行的实例数 运行,直接适应 from the documentation examples:
>>> def func(M):
>>> return -M
...
>>> pool = Pool(processes=4) # start 4 worker processes
>>> results = []
>>> for i in range(1,100):
... results.append(pool.apply_async(func, [i])) # evaluate "func(i)" asynchronously
...
>>> print [result.get() for result in results]
[-1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -12, -13, -14, -15, -16, -17, -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28, -29, -30, -31, -32, -33, -34, -35, -36, -37, -38, -39, -40, -41, -42, -43, -44, -45, -46, -47, -48, -49, -50, -51, -52, -53, -54, -55, -56, -57, -58, -59, -60, -61, -62, -63, -64, -65, -66, -67, -68, -69, -70, -71, -72, -73, -74, -75, -76, -77, -78, -79, -80, -81, -82, -83, -84, -85, -86, -87, -88, -89, -90, -91, -92, -93, -94, -95, -96, -97, -98, -99]
做同样事情的另一种方法是使用:
>>> print(pool.map(func, range(1,100)))
[-1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -12, -13, -14, -15, -16, -17, -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28, -29, -30, -31, -32, -33, -34, -35, -36, -37, -38, -39, -40, -41, -42, -43, -44, -45, -46, -47, -48, -49, -50, -51, -52, -53, -54, -55, -56, -57, -58, -59, -60, -61, -62, -63, -64, -65, -66, -67, -68, -69, -70, -71, -72, -73, -74, -75, -76, -77, -78, -79, -80, -81, -82, -83, -84, -85, -86, -87, -88, -89, -90, -91, -92, -93, -94, -95, -96, -97, -98, -99]
话虽如此,如果您所做的一切确实是在否定价值(或一些简单的东西)——我猜你不是——那么最好 [=72=]NOT 使用并行化,因为 python 并且您的微处理器将对工作进行矢量化处理并使其 运行 更快:
>>> print([-M for M in range(1,100)])
[-1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -12, -13, -14, -15, -16, -17, -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28, -29, -30, -31, -32, -33, -34, -35, -36, -37, -38, -39, -40, -41, -42, -43, -44, -45, -46, -47, -48, -49, -50, -51, -52, -53, -54, -55, -56, -57, -58, -59, -60, -61, -62, -63, -64, -65, -66, -67, -68, -69, -70, -71, -72, -73, -74, -75, -76, -77, -78, -79, -80, -81, -82, -83, -84, -85, -86, -87, -88, -89, -90, -91, -92, -93, -94, -95, -96, -97, -98, -99]
这是一个指标,因为 运行 在我的机器上 python2:
>>> from timeit import timeit
>>> timeit("[-M for M in range(1,100)]", number=100000)
0.6327948570251465
>>> def test():
... pool.map(func, range(1,100))
...
>>> timeit(test, number=100000)
31.26303195953369
编辑:
你的问题的问题在于,你想要达到什么目的,以及你想要做什么是很不清楚的。通常并行化的第一个途径是做一个非并行化的版本,并尝试并行化那些会更好地工作并且没有互斥的东西。
但是在您的代码中让我印象深刻的一件事是,对于 range(1,100)
循环的每次迭代,您实际上是在等待一个进程完成,然后再启动一个新进程:
for i in range(1,100) :
M = mp.Array(ctypes.c_double, np.ones(i))
# Create process
p = mp.Process(target = func, args = (coor_i, j)) for j in range(1,i)
# Start a process
p.start()
# wait for the process p to finish before going on
p.join()
# will continue when p has finished
如果你想解决这个问题,你可以使用我示例中所示的 Pool
或者:
for i in range(1,100) :
M = mp.Array(ctypes.c_double, np.ones(i))
# Create process
p = mp.Process(target = func, args = (coor_i, j)) for j in range(1,i)
# Start a process
p.start()
# wait for the process p to finish before going on
p.join()
# will continue when p has finished
所以一个快速的改进是:
processes = [] # keep a list of all the processes
for i in range(1,100) :
M = mp.Array(ctypes.c_double, np.ones(i))
# Create process
for j in range(1,i):
p = mp.Process(target = func, args = (coor_i, j))
# Append to processes list
processes.append(p)
# Start a process
p.start()
# wait for all processes to have finished before quitting (or going on)
for p in processes:
p.join()
所以我刚刚更新了代码:在一个循环中启动所有进程,在另一个循环中阻塞直到它们完成。 .join()
使当前进程阻塞,直到另一个进程完成,因此第二个循环确保每个进程在完成您的代码段之前都已完成,或者继续执行进一步的代码。
因此对于 "generator" 问题,我没有注意到您在 mp.Process
行上执行了 for j in range(1, i)
,因此没有将进程附加到进程列表,而是列表生成器。
实际上你的conde 不能工作,编译时应该失败,因为:
p = Process(target = func, args = (coor_i, j)) for j in range(1 ,i)
^
SyntaxError: invalid syntax
如果您按以下方式更正:
>>> (p = Process(target = func, args = (coor_i, j)) for j in range(1 ,i))
那么你的原始代码 不能 工作,因为 p
不是一个进程,它是一个列表生成器并且 p.start()
不存在:
>>> p.next()
<Process(Process-5, initial)>
>>> p.next()
<Process(Process-6, initial)>
>>> p.next()
<Process(Process-7, initial)>
>>> p.next()
<Process(Process-8, initial)>
>>> p.next()
<Process(Process-9, initial)>
所以在这种情况下不使用 for one 衬里是一个坏主意,使用传统的 for 更正了这一点。