thread.lock 在自定义参数搜索期间 class 使用 Dask 分布式
thread.lock during custom parameter search class using Dask distributed
我编写了自己的参数搜索实现,主要是因为 我不需要 scikit-learn 的 GridSearch 和 RandomizedSearch 的交叉验证。
我使用 dask 来提供最佳的分布式性能。
这是我的:
from scipy.stats import uniform
class Params(object):
def __init__(self,fixed,loc=0.0,scale=1.0):
self.fixed=fixed
self.sched=uniform(loc=loc,scale=scale)
def _getsched(self,i,size):
return self.sched.rvs(size=size,random_state=i)
def param(self,i,size=None):
tmp=self.fixed.copy()
if size is None:
size=tmp['niter']
tmp.update({'schd':self._getsched(i,size)})
return tmp
class Mymodel(object):
def __init__(self,func,params_object,score,ntries,client):
self.params=params_object
self.func=func
self.score=score
self.ntries=ntries
self.client=client
def _run(self,params,train,test):
return self.func(params,train,test,self.score)
def build(self,train,test):
res=[]
for i in range(self.ntries):
cparam=self.params.param(i)
res.append( (cparam, self.client.submit(self._run, cparam, train,test)) )
self._results=res
return res
def compute_optimal(self,res=None):
from operator import itemgetter
if res is None:
res=self._results
self._sorted=sorted(self.client.compute(res),key=itemgetter(1))
return self._sorted[0]
def score(test,correct):
return np.linalg.norm(test-correct)
def myfunc(params,ldata,data,score):
schd=params['schd']
niter=len(schd)
#here I do some magic after which ldata is changing
return score(test=ldata,correct=data)
我开始后dask.distributed:
from distributed import Client
scheduler_host='myhostname:8786'
cli=Client(scheduler_host)
我运行是这样的:
%%time
params=Params({'niter':50},loc=1.0e-06,scale=1.0)
model=Mymodel(myfunc,params,score,100,cli)
ptdata=bad_data_example.copy()
graph=model.build(ptdata,good_data)
得到这个:
distributed.protocol.pickle - INFO - Failed to serialize
<bound method Mymodel._run of <__main__.Mymodel object at 0x2b8961903050>>.
Exception: can't pickle thread.lock objects
你能帮我了解这是怎么回事以及如何解决这个问题吗?
我也很好奇我如何在所有参数结果中找到最小值。 用 Dask 有更好的方法吗?
我写这段代码的速度相当快,从未尝试过串行。
我正在学习 Dask 以及许多其他主题(机器学习、gpu 编程、Numba、Python OOP 等),所以这段代码无论如何都不是最佳的...
P.S。为了实际执行它,我使用了这个调用:model.compute_optimal()
。还没到这里 - 由于上面的错误。
看起来主要问题是由于我试图映射一个函数的方法。 joblib
我也有类似的问题。所以我重新编码了问题并删除了所有 类.
以下关于优化的问题发布在这里:Parameter search using dask
我肯定会在我的工作中使用 dask-searchcv
- 当我需要交叉验证时 - 但现在它实际上只是对最佳解决方案的简单搜索 - 所以必须创建我自己的实现。 ..
解决眼前的问题:Mymodel
有一个属性 client
,因为无法序列化客户端。如果必须,请使用 distributed.get_client
而不是 client
作为属性。
I'll definetely use dask-searchcv in my work - when I'll need cross-validation - but for now it's really only a simple search for an optimal solution - so had to create my own implementation...
Dask-ML 也有很多超参数搜索功能。这是一个很好的概述:https://ml.dask.org/hyper-parameter-search.html
默认情况下,许多此类搜索不进行大量交叉验证,因为它们假定数据很大(请参阅 IncrementalSearchCV
)。其中一些搜索具有减少计算量的奇特方法(参见 HyperbandSearchCV
)。
我编写了自己的参数搜索实现,主要是因为 我不需要 scikit-learn 的 GridSearch 和 RandomizedSearch 的交叉验证。
我使用 dask 来提供最佳的分布式性能。
这是我的:
from scipy.stats import uniform
class Params(object):
def __init__(self,fixed,loc=0.0,scale=1.0):
self.fixed=fixed
self.sched=uniform(loc=loc,scale=scale)
def _getsched(self,i,size):
return self.sched.rvs(size=size,random_state=i)
def param(self,i,size=None):
tmp=self.fixed.copy()
if size is None:
size=tmp['niter']
tmp.update({'schd':self._getsched(i,size)})
return tmp
class Mymodel(object):
def __init__(self,func,params_object,score,ntries,client):
self.params=params_object
self.func=func
self.score=score
self.ntries=ntries
self.client=client
def _run(self,params,train,test):
return self.func(params,train,test,self.score)
def build(self,train,test):
res=[]
for i in range(self.ntries):
cparam=self.params.param(i)
res.append( (cparam, self.client.submit(self._run, cparam, train,test)) )
self._results=res
return res
def compute_optimal(self,res=None):
from operator import itemgetter
if res is None:
res=self._results
self._sorted=sorted(self.client.compute(res),key=itemgetter(1))
return self._sorted[0]
def score(test,correct):
return np.linalg.norm(test-correct)
def myfunc(params,ldata,data,score):
schd=params['schd']
niter=len(schd)
#here I do some magic after which ldata is changing
return score(test=ldata,correct=data)
我开始后dask.distributed:
from distributed import Client
scheduler_host='myhostname:8786'
cli=Client(scheduler_host)
我运行是这样的:
%%time
params=Params({'niter':50},loc=1.0e-06,scale=1.0)
model=Mymodel(myfunc,params,score,100,cli)
ptdata=bad_data_example.copy()
graph=model.build(ptdata,good_data)
得到这个:
distributed.protocol.pickle - INFO - Failed to serialize
<bound method Mymodel._run of <__main__.Mymodel object at 0x2b8961903050>>.
Exception: can't pickle thread.lock objects
你能帮我了解这是怎么回事以及如何解决这个问题吗?
我也很好奇我如何在所有参数结果中找到最小值。 用 Dask 有更好的方法吗?
我写这段代码的速度相当快,从未尝试过串行。 我正在学习 Dask 以及许多其他主题(机器学习、gpu 编程、Numba、Python OOP 等),所以这段代码无论如何都不是最佳的...
P.S。为了实际执行它,我使用了这个调用:model.compute_optimal()
。还没到这里 - 由于上面的错误。
看起来主要问题是由于我试图映射一个函数的方法。 joblib
我也有类似的问题。所以我重新编码了问题并删除了所有 类.
以下关于优化的问题发布在这里:Parameter search using dask
我肯定会在我的工作中使用 dask-searchcv
- 当我需要交叉验证时 - 但现在它实际上只是对最佳解决方案的简单搜索 - 所以必须创建我自己的实现。 ..
解决眼前的问题:Mymodel
有一个属性 client
,因为无法序列化客户端。如果必须,请使用 distributed.get_client
而不是 client
作为属性。
I'll definetely use dask-searchcv in my work - when I'll need cross-validation - but for now it's really only a simple search for an optimal solution - so had to create my own implementation...
Dask-ML 也有很多超参数搜索功能。这是一个很好的概述:https://ml.dask.org/hyper-parameter-search.html
默认情况下,许多此类搜索不进行大量交叉验证,因为它们假定数据很大(请参阅 IncrementalSearchCV
)。其中一些搜索具有减少计算量的奇特方法(参见 HyperbandSearchCV
)。