使用 Python Joblib 的双并行循环
Double parallel loop with Python Joblib
美好的一天
我正在尝试加速涉及许多独立积分的计算。为此,我使用 pythons Joblib 和多处理。到目前为止,我已经成功地并行化了计算的内部循环,但我想对外部循环执行相同的操作。由于并行编程困扰着我,我想知道是否有人可以帮助我。到目前为止我有:
from joblib import Parallel, delayed
import multiprocessing
N = 10 # Some number
inputs = range(1,N,2)
num_cores = multiprocessing.cpu_count()
def processInput(n):
u_1 = lambda x,y: f(x,y)g(n,m) # Some function
Cn = scintegrate.nquad(u_1, [[A,B],[C,D]]) # A number
return Cn*F(x,y)*G(n,m)
resultsN = []
for m in range(1,N,2): # How can this be parallelized?
add = Parallel(n_jobs=num_cores)(delayed(processInput)(n) for n in inputs)
resultsN = add + resultsN
resultsN = sum(resultsN)
到目前为止,这已经产生了正确的结果。现在我想对外循环做同样的事情。有谁知道我该怎么做?
我也想知道 u_1 声明是否可以在 processInput 之外完成,如有任何其他改进建议,我们将不胜感激。
感谢您的回复。
如果我理解正确,你 运行 你的函数 processInput(n)
用于 n
值的范围,你需要这样做 m
次并将所有内容加在一起.在这里,索引 m
只记录你想要 运行 你的处理函数多少次并将结果加在一起,但没有别的。这使您可以仅通过一层并行性来完成所有工作,即创建一个已经包含重复值的输入列表,并将该工作负载分配给您的核心。快速的直觉是,不是并行处理输入 [1,2,3,4]
然后重复多次,而是 运行 并行输入 [1,1,1,2,2,2,3,3,3,4,4,4]
。它可能是这样的(我已经将您的函数更改为我可以 运行 的更简单的函数)。
import numpy as np
from joblib import Parallel, delayed
import multiprocessing
from math import ceil
N = 10 # Some number
inputs = range(1,N,2)
num_cores = multiprocessing.cpu_count()
def processInput(n): # toy function
return n
resultsN = []
# your original solution with an additional loop that needs
# to be parallelized
for m in range(1,N,2):
add = Parallel(n_jobs=num_cores)(delayed(processInput)(n) for n in inputs)
resultsN = add + resultsN
resultsN = sum(resultsN)
print resultsN
# solution with only one layer of parallelization
ext_inputs = np.repeat(inputs,ceil(m/2.0)).tolist()
add = Parallel(n_jobs=num_cores)(delayed(processInput)(n) for n in ext_inputs)
resultsN = sum(add)
print resultsN
ceil
是必需的,因为在您的原始循环中,m
会跳过每个第二个值。
美好的一天
我正在尝试加速涉及许多独立积分的计算。为此,我使用 pythons Joblib 和多处理。到目前为止,我已经成功地并行化了计算的内部循环,但我想对外部循环执行相同的操作。由于并行编程困扰着我,我想知道是否有人可以帮助我。到目前为止我有:
from joblib import Parallel, delayed
import multiprocessing
N = 10 # Some number
inputs = range(1,N,2)
num_cores = multiprocessing.cpu_count()
def processInput(n):
u_1 = lambda x,y: f(x,y)g(n,m) # Some function
Cn = scintegrate.nquad(u_1, [[A,B],[C,D]]) # A number
return Cn*F(x,y)*G(n,m)
resultsN = []
for m in range(1,N,2): # How can this be parallelized?
add = Parallel(n_jobs=num_cores)(delayed(processInput)(n) for n in inputs)
resultsN = add + resultsN
resultsN = sum(resultsN)
到目前为止,这已经产生了正确的结果。现在我想对外循环做同样的事情。有谁知道我该怎么做?
我也想知道 u_1 声明是否可以在 processInput 之外完成,如有任何其他改进建议,我们将不胜感激。
感谢您的回复。
如果我理解正确,你 运行 你的函数 processInput(n)
用于 n
值的范围,你需要这样做 m
次并将所有内容加在一起.在这里,索引 m
只记录你想要 运行 你的处理函数多少次并将结果加在一起,但没有别的。这使您可以仅通过一层并行性来完成所有工作,即创建一个已经包含重复值的输入列表,并将该工作负载分配给您的核心。快速的直觉是,不是并行处理输入 [1,2,3,4]
然后重复多次,而是 运行 并行输入 [1,1,1,2,2,2,3,3,3,4,4,4]
。它可能是这样的(我已经将您的函数更改为我可以 运行 的更简单的函数)。
import numpy as np
from joblib import Parallel, delayed
import multiprocessing
from math import ceil
N = 10 # Some number
inputs = range(1,N,2)
num_cores = multiprocessing.cpu_count()
def processInput(n): # toy function
return n
resultsN = []
# your original solution with an additional loop that needs
# to be parallelized
for m in range(1,N,2):
add = Parallel(n_jobs=num_cores)(delayed(processInput)(n) for n in inputs)
resultsN = add + resultsN
resultsN = sum(resultsN)
print resultsN
# solution with only one layer of parallelization
ext_inputs = np.repeat(inputs,ceil(m/2.0)).tolist()
add = Parallel(n_jobs=num_cores)(delayed(processInput)(n) for n in ext_inputs)
resultsN = sum(add)
print resultsN
ceil
是必需的,因为在您的原始循环中,m
会跳过每个第二个值。