Python 多处理块在 waiter.acquire() 中不确定
Python multiprocessing blocks indefinately in waiter.acquire()
有人可以解释为什么这段代码会阻塞并且无法完成吗?
我遵循了几个 multiprocessing
的示例,并且我编写了一些非常相似的代码,这些代码不会被阻止。但是,显然,我看不出该工作代码与下面的代码有什么区别。我认为一切都很好。它一直到 .get(),但是 none 的进程完成了。
问题是 python3 在 waiter.acquire() 中无限期阻塞,您可以通过中断它并阅读回溯来判断。
$ python3 ./try415.py
^CTraceback (most recent call last):
File "./try415.py", line 43, in <module>
ps = [ res.get() for res in proclist ]
File "./try415.py", line 43, in <listcomp>
ps = [ res.get() for res in proclist ]
File "/usr/lib64/python3.6/multiprocessing/pool.py", line 638, in get
self.wait(timeout)
File "/usr/lib64/python3.6/multiprocessing/pool.py", line 635, in wait
self._event.wait(timeout)
File "/usr/lib64/python3.6/threading.py", line 551, in wait
signaled = self._cond.wait(timeout)
File "/usr/lib64/python3.6/threading.py", line 295, in wait
waiter.acquire()
KeyboardInterrupt
这是代码
from multiprocessing import Pool
from scipy import optimize
import numpy as np
def func(t, a, b, c):
return 0.5*a*t**2 + b*t + c
def funcwrap(t, params):
return func(t, *params)
def fitWithErr(procid, yFitValues, simga, func, p0, args, bounds):
np.random.seed() # force new seed
randomDelta = np.random.normal(0., sigma, len(yFitValues))
randomdataY = yFitValues + randomDelta
errfunc = lambda p, x, y: func(p, x) -y
optResult = optimize.least_squares(errfunc, p0, args=args, bounds=bounds)
return optResult.x
def fit_bootstrap(function, datax, datay, p0, bounds, aprioriUnc):
errfunc = lambda p, x, y: function(x,p) - y
optResult = optimize.least_squares(errfunc, x0=p0, args=(datax, datay), bounds=bounds)
pfit = optResult.x
residuals = optResult.fun
fity = function(datax, pfit)
numParallelProcesses = 2**2 # should be equal to number of ALUs
numTrials = 2**2 # this many random data sets are generated and fitted
trialParameterList = list()
for i in range(0,numTrials):
trialParameterList.append( [i, fity, aprioriUnc, function, p0, (datax, datay), bounds] )
with Pool(processes=numParallelProcesses) as pool:
proclist = [ pool.apply_async(fitWithErr, args) for args in trialParameterList ]
ps = [ res.get() for res in proclist ]
ps = np.array(ps)
mean_pfit = np.mean(ps,0)
return mean_pfit
if __name__ == '__main__':
x = np.linspace(0,3,2000)
p0 = [-9.81, 1., 0.]
y = funcwrap(x, p0)
bounds = [ (-20,-1., -1E-6),(20,3,1E-6) ]
fit_bootstrap(funcwrap, x, y, p0, bounds=bounds, aprioriUnc=0.1)
抱歉回答错误。不验证它是非常不负责任的。这是我的回答。
with Pool(processes=numParallelProcesses) as pool:
此行错误,因为将调用 exit 函数而不关闭。这是退出函数体:
def __exit__(self, exc_type, exc_val, exc_tb):
self.terminate()
所有的进程都将终止,永远不会执行。
代码:
ps = [ res.get() for res in proclist ]
没有超时参数。这是 get 函数体:
def get(self, timeout=None):
self.wait(timeout)
if not self.ready():
raise TimeoutError
if self._success:
return self._value
else:
raise self._value
没有超时就一直等待。这就是它挂起的原因。
你需要改变
with Pool(processes=numParallelProcesses) as pool:
proclist = [ pool.apply_async(fitWithErr, args) for args in trialParameterList ]
至:
pool=Pool(processes=numParallelProcesses)
proclist = [ pool.apply_async(fitWithErr, args) for args in trialParameterList ]
pool.close()
缩进
毕竟,我只是没有意识到某些代码不在应该在的 with
子句中。 (除了一些拼写错误和其他错误,我现在已经修复了这些错误。)间奏曲再次来袭!
感谢 Snowy 让我以不同的方式完成它,直到我发现我的错误。我只是不清楚我打算做什么。 Snowy 的颂歌是完全有效且等效的代码。但是,郑重声明,timeout
不是 必需的。而且,更重要的是,with
对进程 完全有效,如果 您正确使用它,如 Python3.6.6 [=14 的第一段所示=] 文档,这是我得到它的地方。我只是搞砸了,不知何故。我尝试编写的代码很简单:
with Pool(processes=numParallelProcesses) as pool:
proclist = [ pool.apply_async(fitWithErr, args) for args in trialParameterList ]
ps = [ res.get() for res in proclist ]
ps = np.array(ps)
mean_pfit = np.mean(ps,0)
像我预期的那样工作。
有人可以解释为什么这段代码会阻塞并且无法完成吗?
我遵循了几个 multiprocessing
的示例,并且我编写了一些非常相似的代码,这些代码不会被阻止。但是,显然,我看不出该工作代码与下面的代码有什么区别。我认为一切都很好。它一直到 .get(),但是 none 的进程完成了。
问题是 python3 在 waiter.acquire() 中无限期阻塞,您可以通过中断它并阅读回溯来判断。
$ python3 ./try415.py
^CTraceback (most recent call last):
File "./try415.py", line 43, in <module>
ps = [ res.get() for res in proclist ]
File "./try415.py", line 43, in <listcomp>
ps = [ res.get() for res in proclist ]
File "/usr/lib64/python3.6/multiprocessing/pool.py", line 638, in get
self.wait(timeout)
File "/usr/lib64/python3.6/multiprocessing/pool.py", line 635, in wait
self._event.wait(timeout)
File "/usr/lib64/python3.6/threading.py", line 551, in wait
signaled = self._cond.wait(timeout)
File "/usr/lib64/python3.6/threading.py", line 295, in wait
waiter.acquire()
KeyboardInterrupt
这是代码
from multiprocessing import Pool
from scipy import optimize
import numpy as np
def func(t, a, b, c):
return 0.5*a*t**2 + b*t + c
def funcwrap(t, params):
return func(t, *params)
def fitWithErr(procid, yFitValues, simga, func, p0, args, bounds):
np.random.seed() # force new seed
randomDelta = np.random.normal(0., sigma, len(yFitValues))
randomdataY = yFitValues + randomDelta
errfunc = lambda p, x, y: func(p, x) -y
optResult = optimize.least_squares(errfunc, p0, args=args, bounds=bounds)
return optResult.x
def fit_bootstrap(function, datax, datay, p0, bounds, aprioriUnc):
errfunc = lambda p, x, y: function(x,p) - y
optResult = optimize.least_squares(errfunc, x0=p0, args=(datax, datay), bounds=bounds)
pfit = optResult.x
residuals = optResult.fun
fity = function(datax, pfit)
numParallelProcesses = 2**2 # should be equal to number of ALUs
numTrials = 2**2 # this many random data sets are generated and fitted
trialParameterList = list()
for i in range(0,numTrials):
trialParameterList.append( [i, fity, aprioriUnc, function, p0, (datax, datay), bounds] )
with Pool(processes=numParallelProcesses) as pool:
proclist = [ pool.apply_async(fitWithErr, args) for args in trialParameterList ]
ps = [ res.get() for res in proclist ]
ps = np.array(ps)
mean_pfit = np.mean(ps,0)
return mean_pfit
if __name__ == '__main__':
x = np.linspace(0,3,2000)
p0 = [-9.81, 1., 0.]
y = funcwrap(x, p0)
bounds = [ (-20,-1., -1E-6),(20,3,1E-6) ]
fit_bootstrap(funcwrap, x, y, p0, bounds=bounds, aprioriUnc=0.1)
抱歉回答错误。不验证它是非常不负责任的。这是我的回答。
with Pool(processes=numParallelProcesses) as pool:
此行错误,因为将调用 exit 函数而不关闭。这是退出函数体:
def __exit__(self, exc_type, exc_val, exc_tb):
self.terminate()
所有的进程都将终止,永远不会执行。 代码:
ps = [ res.get() for res in proclist ]
没有超时参数。这是 get 函数体:
def get(self, timeout=None):
self.wait(timeout)
if not self.ready():
raise TimeoutError
if self._success:
return self._value
else:
raise self._value
没有超时就一直等待。这就是它挂起的原因。
你需要改变
with Pool(processes=numParallelProcesses) as pool:
proclist = [ pool.apply_async(fitWithErr, args) for args in trialParameterList ]
至:
pool=Pool(processes=numParallelProcesses)
proclist = [ pool.apply_async(fitWithErr, args) for args in trialParameterList ]
pool.close()
缩进
毕竟,我只是没有意识到某些代码不在应该在的 with
子句中。 (除了一些拼写错误和其他错误,我现在已经修复了这些错误。)间奏曲再次来袭!
感谢 Snowy 让我以不同的方式完成它,直到我发现我的错误。我只是不清楚我打算做什么。 Snowy 的颂歌是完全有效且等效的代码。但是,郑重声明,timeout
不是 必需的。而且,更重要的是,with
对进程 完全有效,如果 您正确使用它,如 Python3.6.6 [=14 的第一段所示=] 文档,这是我得到它的地方。我只是搞砸了,不知何故。我尝试编写的代码很简单:
with Pool(processes=numParallelProcesses) as pool:
proclist = [ pool.apply_async(fitWithErr, args) for args in trialParameterList ]
ps = [ res.get() for res in proclist ]
ps = np.array(ps)
mean_pfit = np.mean(ps,0)
像我预期的那样工作。