多处理 - 限制 CPU 使用
Multiprocessing - limit CPU usage
感谢 :
,我有一个代码可以并行化
1| def function(name, params):
2| results = fits.open(name)
3| <do something more to results>
4| return results
5|
6| def function_wrapper(args):
7| return function(*args)
8|
9| params = [...,...,..., etc]
10|
11| p = multiprocessing..Pool(processes=(max([2, mproc.cpu_count() // 10])))
12| args_generator = ((name, params) for name in names)
13|
14| dictionary = dict(zip(names, p.map(function_wrapper, args_generator)))
如果我正确理解 pool
的工作原理,第 11 行中指定的进程数应该是在给定时间产生的最大进程数.因此,这应该会限制我的 CPU 使用,对吗?我的意思是,按照我的理解,第11行的设置,最大使用的processes/CPUs数应该是[2, number_of_cpus / 10]
.
的最大值
尽管如此,当我 运行 我的代码时,我发现在我开始后不久,所有 CPU 都处于 100%。我错过了什么吗?
注意:对于上下文,我需要将我的CPU使用限制为最大内核数,因为我将使用共享服务器。
更新:添加我的代码的修剪版本。我没有打开 fits
文件,而是创建了一条类似于我的光谱的嘈杂高斯曲线(尽管表现更好......)。
修剪它有助于解决问题。在函数 fnBootstrapInstance
中,拟合是在我使用 for loop
迭代的二维数组(基本上是阶梯形光谱)上执行的。出于某种原因,删除循环,解决了问题,并且只使用了我指定的内核数。我的猜测是,出于某种原因,for 循环产生了一系列子进程(这就是它在 htop
上出现的方式)。一次迭代一个阶的 ecehelles 光谱解决了这个问题。
# Imports
#%matplotlib inline
import sys
import numpy as np
import matplotlib.pyplot as mplt
import numpy.random as rnd
import scipy.optimize as opt
import multiprocessing as mproc
# Functions ==================================================
def fnBootstrapInstance(XXX = None, YYY= None, function= None, lenght=None, fitBounds= None, initParams=None, **kwargs):
# define samples
indexes = sorted(rnd.choice(range(len(XXX)), size=lenght, replace=True))
samplesXXX = XXX[indexes]
samplesYYY = YYY[indexes]
fitBounds = ([-np.inf,-np.inf,0,-np.inf],[np.inf,np.inf,np.inf,np.inf])
params, cov = opt.curve_fit(function, samplesXXX.ravel(), samplesYYY.ravel(), p0=initParams,
bounds = fitBounds,
)
return params
def wrapper_fnBootstrapInstance(args):
return fnBootstrapInstance(**args)
def fnGaussian(dataXXX, Amp, mean, FWHM, B):
return B - Amp * np.exp(-4 * np.log(2) * (((dataXXX - mean) / FWHM) ** 2))
# Functions ==================================================
# Noise Parameters
arrLen = 1000
noiseAmp = 0.
noiseSTD = .25
# Gaussian Data Parameters
amp = 1.
mean = 10
FWHM = 30.
B = 1.
# generate random gauss data
arrGaussXXX = np.linspace(-50, 60,num = arrLen)
arrGaussNoise = rnd.normal(noiseAmp,noiseSTD, arrLen)
arrGaussYYY = fnGaussian(arrGaussXXX, amp, mean, FWHM, B) + arrGaussNoise
# multiprocessing bit
numIterations = 1000
mprocPool = mproc.Pool(processes=(max([2, mproc.cpu_count() // 10])))
initParams = [max(arrGaussYYY) - min(arrGaussYYY), np.median(arrGaussXXX),
max(arrGaussXXX) - min(arrGaussXXX), max(arrGaussYYY)]
args_generator = [{'XXX':arrGaussXXX, 'YYY':arrGaussYYY, 'function':fnGaussian, 'initParams':initParams,
'lenght':200} for n in range(numIterations)]
fitParams = []
for results in mprocPool.imap(wrapper_fnBootstrapInstance, args_generator):
fitParams.append([results[0],results[1],results[2],results[3]])
bootParams = [(np.nanmedian(param),np.nanstd(param)) for param in np.array(fitParams).T]
print '\n'.join('{:.2f}+-{:.2f} ({:.1f}%)'.format(param[0],param[1], param[1]/param[0]*100) for param in bootParams)
mplt.figure(figsize=(20,10))
mplt.plot(arrGaussXXX, arrGaussYYY,'+')
for params in fitParams:
mplt.plot(arrGaussXXX,fnGaussian(arrGaussXXX,*params),'r', alpha = .5)
mplt.show()
mprocPool.close()
谢谢大家!
考虑使用 multiprocessing.pool.ThreadPool
。它提供与 multiprocessing.Pool
相同的 API,但将工作负载抽象为线程集合。请注意,如果您的 CPU 支持超线程,那么它很可能会将工作负载分配到物理内核上。
感谢
1| def function(name, params):
2| results = fits.open(name)
3| <do something more to results>
4| return results
5|
6| def function_wrapper(args):
7| return function(*args)
8|
9| params = [...,...,..., etc]
10|
11| p = multiprocessing..Pool(processes=(max([2, mproc.cpu_count() // 10])))
12| args_generator = ((name, params) for name in names)
13|
14| dictionary = dict(zip(names, p.map(function_wrapper, args_generator)))
如果我正确理解 pool
的工作原理,第 11 行中指定的进程数应该是在给定时间产生的最大进程数.因此,这应该会限制我的 CPU 使用,对吗?我的意思是,按照我的理解,第11行的设置,最大使用的processes/CPUs数应该是[2, number_of_cpus / 10]
.
尽管如此,当我 运行 我的代码时,我发现在我开始后不久,所有 CPU 都处于 100%。我错过了什么吗?
注意:对于上下文,我需要将我的CPU使用限制为最大内核数,因为我将使用共享服务器。
更新:添加我的代码的修剪版本。我没有打开 fits
文件,而是创建了一条类似于我的光谱的嘈杂高斯曲线(尽管表现更好......)。
修剪它有助于解决问题。在函数 fnBootstrapInstance
中,拟合是在我使用 for loop
迭代的二维数组(基本上是阶梯形光谱)上执行的。出于某种原因,删除循环,解决了问题,并且只使用了我指定的内核数。我的猜测是,出于某种原因,for 循环产生了一系列子进程(这就是它在 htop
上出现的方式)。一次迭代一个阶的 ecehelles 光谱解决了这个问题。
# Imports
#%matplotlib inline
import sys
import numpy as np
import matplotlib.pyplot as mplt
import numpy.random as rnd
import scipy.optimize as opt
import multiprocessing as mproc
# Functions ==================================================
def fnBootstrapInstance(XXX = None, YYY= None, function= None, lenght=None, fitBounds= None, initParams=None, **kwargs):
# define samples
indexes = sorted(rnd.choice(range(len(XXX)), size=lenght, replace=True))
samplesXXX = XXX[indexes]
samplesYYY = YYY[indexes]
fitBounds = ([-np.inf,-np.inf,0,-np.inf],[np.inf,np.inf,np.inf,np.inf])
params, cov = opt.curve_fit(function, samplesXXX.ravel(), samplesYYY.ravel(), p0=initParams,
bounds = fitBounds,
)
return params
def wrapper_fnBootstrapInstance(args):
return fnBootstrapInstance(**args)
def fnGaussian(dataXXX, Amp, mean, FWHM, B):
return B - Amp * np.exp(-4 * np.log(2) * (((dataXXX - mean) / FWHM) ** 2))
# Functions ==================================================
# Noise Parameters
arrLen = 1000
noiseAmp = 0.
noiseSTD = .25
# Gaussian Data Parameters
amp = 1.
mean = 10
FWHM = 30.
B = 1.
# generate random gauss data
arrGaussXXX = np.linspace(-50, 60,num = arrLen)
arrGaussNoise = rnd.normal(noiseAmp,noiseSTD, arrLen)
arrGaussYYY = fnGaussian(arrGaussXXX, amp, mean, FWHM, B) + arrGaussNoise
# multiprocessing bit
numIterations = 1000
mprocPool = mproc.Pool(processes=(max([2, mproc.cpu_count() // 10])))
initParams = [max(arrGaussYYY) - min(arrGaussYYY), np.median(arrGaussXXX),
max(arrGaussXXX) - min(arrGaussXXX), max(arrGaussYYY)]
args_generator = [{'XXX':arrGaussXXX, 'YYY':arrGaussYYY, 'function':fnGaussian, 'initParams':initParams,
'lenght':200} for n in range(numIterations)]
fitParams = []
for results in mprocPool.imap(wrapper_fnBootstrapInstance, args_generator):
fitParams.append([results[0],results[1],results[2],results[3]])
bootParams = [(np.nanmedian(param),np.nanstd(param)) for param in np.array(fitParams).T]
print '\n'.join('{:.2f}+-{:.2f} ({:.1f}%)'.format(param[0],param[1], param[1]/param[0]*100) for param in bootParams)
mplt.figure(figsize=(20,10))
mplt.plot(arrGaussXXX, arrGaussYYY,'+')
for params in fitParams:
mplt.plot(arrGaussXXX,fnGaussian(arrGaussXXX,*params),'r', alpha = .5)
mplt.show()
mprocPool.close()
谢谢大家!
考虑使用 multiprocessing.pool.ThreadPool
。它提供与 multiprocessing.Pool
相同的 API,但将工作负载抽象为线程集合。请注意,如果您的 CPU 支持超线程,那么它很可能会将工作负载分配到物理内核上。