如何将交叉验证目标输入管道中的自定义转换器

How to feed cross-validation targets into custom transformers in pipeline

我一直在努力解决使用 sklearn 的管道和 FeatureUnion classes 使一些自定义转换器工作的问题。我最终想使用 GridsSearchCV 来尝试一些不同的参数,但一开始我就卡在这里了。我有以下管道:

feature_selection = FeatureUnion([
("fprfeatures", SelectFprAttrib()),
("modelfeatures", 
    SelectModelAttrib(clf=RandomForestClassifier(n_estimators=150), on=True)),
])

full_pipeline = Pipeline([
    ("dataselector", DataSelector(numcolumns)),
    ("scaler", ScalerFlip()),
    ("features", feature_selection),
    ("estimators",estimator_pipe),
])

此处是我的自定义示例 class(它们本质上相同):

#Custom SelectFromModel that allows me to mess with attribute numbers and 
toggle

class SelectModelAttrib(BaseEstimator, TransformerMixin):

from sklearn.feature_selection import SelectFromModel
def __init__(self, clf, attrib_number=20, on=True):
    self.attrib_number = attrib_number
    self.clf = clf
    self.on = on
def fit(self, X, y=None):
    self.y = y
    return self
def transform(self, X):
    if self.on:
        self.model = SelectFromModel(self.clf)
        return self.model.fit_transform(X,self.y)[:,:self.attrib_number]
    else:
        return np.empty_like(X)
def get_support(self):
    return self.model.get_support()

如果我打电话给

full_pipeline.fit(features, targets)

我没有问题。事实上,如果我注释掉估算器和 运行 以下内容:

full_pipeline.fit_transform(features, targets)

我得到了我想要的一系列功能。但是,当我 运行 full_pipeline 通过 GridSearchCV 时:

#X is in format rows=instances, columns=features

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

attrib_number = ss.randint(1,100)

param_grid = {"estimators":[SVC(kernel="rbf"), 
    SVC(kernel="poly"),LinearSVC(), LogisticRegression()],}

pipe_grd = GridSearchCV(full_pipeline, param_grid, cv=4, scoring = 
"accuracy", verbose=2)
pipe_grd.fit(X_train, y_train)

我得到以下回溯....

ValueError                                Traceback (most recent call last)
<ipython-input-72-86f7cf8d7839> in <module>()
     16 
     17 pipe_grd = GridSearchCV(full_pipeline, param_grid, cv=4, scoring = "accuracy", verbose=2, n_jobs=1)
---> 18 pipe_grd.fit(X_train, y_train)
     19 #full_pipeline.predict(X_test)

C:\Users\philg\Anaconda2\lib\site-packages\sklearn\model_selection\_search.pyc in fit(self, X, y, groups)
    943             train/test set.
    944         """
--> 945         return self._fit(X, y, groups, ParameterGrid(self.param_grid))
    946 
    947 

C:\Users\philg\Anaconda2\lib\site-packages\sklearn\model_selection\_search.pyc in _fit(self, X, y, groups, parameter_iterable)
    562                                   return_times=True, return_parameters=True,
    563                                   error_score=self.error_score)
--> 564           for parameters in parameter_iterable
    565           for train, test in cv_iter)
    566 

C:\Users\philg\Anaconda2\lib\site-packages\sklearn\externals\joblib\parallel.pyc in __call__(self, iterable)
    756             # was dispatched. In particular this covers the edge
    757             # case of Parallel used with an exhausted iterator.
--> 758             while self.dispatch_one_batch(iterator):
    759                 self._iterating = True
    760             else:

C:\Users\philg\Anaconda2\lib\site-packages\sklearn\externals\joblib\parallel.pyc in dispatch_one_batch(self, iterator)
    606                 return False
    607             else:
--> 608                 self._dispatch(tasks)
    609                 return True
    610 

C:\Users\philg\Anaconda2\lib\site-packages\sklearn\externals\joblib\parallel.pyc in _dispatch(self, batch)
    569         dispatch_timestamp = time.time()
    570         cb = BatchCompletionCallBack(dispatch_timestamp, len(batch), self)
--> 571         job = self._backend.apply_async(batch, callback=cb)
    572         self._jobs.append(job)
    573 

C:\Users\philg\Anaconda2\lib\site-packages\sklearn\externals\joblib\_parallel_backends.pyc in apply_async(self, func, callback)
    107     def apply_async(self, func, callback=None):
    108         """Schedule a func to be run"""
--> 109         result = ImmediateResult(func)
    110         if callback:
    111             callback(result)

C:\Users\philg\Anaconda2\lib\site-packages\sklearn\externals\joblib\_parallel_backends.pyc in __init__(self, batch)
    324         # Don't delay the application, to avoid keeping the input
    325         # arguments in memory
--> 326         self.results = batch()
    327 
    328     def get(self):

C:\Users\philg\Anaconda2\lib\site-packages\sklearn\externals\joblib\parallel.pyc in __call__(self)
    129 
    130     def __call__(self):
--> 131         return [func(*args, **kwargs) for func, args, kwargs in self.items]
    132 
    133     def __len__(self):

C:\Users\philg\Anaconda2\lib\site-packages\sklearn\model_selection\_validation.pyc in _fit_and_score(estimator, X, y, scorer, train, test, verbose, parameters, fit_params, return_train_score, return_parameters, return_n_test_samples, return_times, error_score)
    258     else:
    259         fit_time = time.time() - start_time
--> 260         test_score = _score(estimator, X_test, y_test, scorer)
    261         score_time = time.time() - start_time - fit_time
    262         if return_train_score:

C:\Users\philg\Anaconda2\lib\site-packages\sklearn\model_selection\_validation.pyc in _score(estimator, X_test, y_test, scorer)
    286         score = scorer(estimator, X_test)
    287     else:
--> 288         score = scorer(estimator, X_test, y_test)
    289     if hasattr(score, 'item'):
    290         try:

C:\Users\philg\Anaconda2\lib\site-packages\sklearn\metrics\scorer.pyc in __call__(self, estimator, X, y_true, sample_weight)
     89         super(_PredictScorer, self).__call__(estimator, X, y_true,
     90                                              sample_weight=sample_weight)
---> 91         y_pred = estimator.predict(X)
     92         if sample_weight is not None:
     93             return self._sign * self._score_func(y_true, y_pred,

C:\Users\philg\Anaconda2\lib\site-packages\sklearn\utils\metaestimators.pyc in <lambda>(*args, **kwargs)
     52 
     53         # lambda, but not partial, allows help() to work with update_wrapper
---> 54         out = lambda *args, **kwargs: self.fn(obj, *args, **kwargs)
     55         # update the docstring of the returned function
     56         update_wrapper(out, self.fn)

C:\Users\philg\Anaconda2\lib\site-packages\sklearn\pipeline.pyc in predict(self, X)
    324         for name, transform in self.steps[:-1]:
    325             if transform is not None:
--> 326                 Xt = transform.transform(Xt)
    327         return self.steps[-1][-1].predict(Xt)
    328 

C:\Users\philg\Anaconda2\lib\site-packages\sklearn\pipeline.pyc in transform(self, X)
    761         Xs = Parallel(n_jobs=self.n_jobs)(
    762             delayed(_transform_one)(trans, name, weight, X)
--> 763             for name, trans, weight in self._iter())
    764         if not Xs:
    765             # All transformers are None

C:\Users\philg\Anaconda2\lib\site-packages\sklearn\externals\joblib\parallel.pyc in __call__(self, iterable)
    756             # was dispatched. In particular this covers the edge
    757             # case of Parallel used with an exhausted iterator.
--> 758             while self.dispatch_one_batch(iterator):
    759                 self._iterating = True
    760             else:

C:\Users\philg\Anaconda2\lib\site-packages\sklearn\externals\joblib\parallel.pyc in dispatch_one_batch(self, iterator)
    606                 return False
    607             else:
--> 608                 self._dispatch(tasks)
    609                 return True
    610 

C:\Users\philg\Anaconda2\lib\site-packages\sklearn\externals\joblib\parallel.pyc in _dispatch(self, batch)
    569         dispatch_timestamp = time.time()
    570         cb = BatchCompletionCallBack(dispatch_timestamp, len(batch), self)
--> 571         job = self._backend.apply_async(batch, callback=cb)
    572         self._jobs.append(job)
    573 

C:\Users\philg\Anaconda2\lib\site-packages\sklearn\externals\joblib\_parallel_backends.pyc in apply_async(self, func, callback)
    107     def apply_async(self, func, callback=None):
    108         """Schedule a func to be run"""
--> 109         result = ImmediateResult(func)
    110         if callback:
    111             callback(result)

C:\Users\philg\Anaconda2\lib\site-packages\sklearn\externals\joblib\_parallel_backends.pyc in __init__(self, batch)
    324         # Don't delay the application, to avoid keeping the input
    325         # arguments in memory
--> 326         self.results = batch()
    327 
    328     def get(self):

C:\Users\philg\Anaconda2\lib\site-packages\sklearn\externals\joblib\parallel.pyc in __call__(self)
    129 
    130     def __call__(self):
--> 131         return [func(*args, **kwargs) for func, args, kwargs in self.items]
    132 
    133     def __len__(self):

C:\Users\philg\Anaconda2\lib\site-packages\sklearn\pipeline.pyc in _transform_one(transformer, name, weight, X)
    565 
    566 def _transform_one(transformer, name, weight, X):
--> 567     res = transformer.transform(X)
    568     # if we have a weight for this transformer, multiply output
    569     if weight is None:

<ipython-input-64-e7d7de2d62c1> in transform(self, X)
     37         if self.on:
     38             self.fpr = SelectFpr()
---> 39             return self.fpr.fit_transform(X,self.y)
     40         else:
     41             return np.empty_like(X)

C:\Users\philg\Anaconda2\lib\site-packages\sklearn\base.pyc in fit_transform(self, X, y, **fit_params)
    495         else:
    496             # fit method of arity 2 (supervised transformation)
--> 497             return self.fit(X, y, **fit_params).transform(X)
    498 
    499 

C:\Users\philg\Anaconda2\lib\site-packages\sklearn\feature_selection\univariate_selection.pyc in fit(self, X, y)
    320             Returns self.
    321         """
--> 322         X, y = check_X_y(X, y, ['csr', 'csc'], multi_output=True)
    323 
    324         if not callable(self.score_func):

C:\Users\philg\Anaconda2\lib\site-packages\sklearn\utils\validation.pyc in check_X_y(X, y, accept_sparse, dtype, order, copy, force_all_finite, ensure_2d, allow_nd, multi_output, ensure_min_samples, ensure_min_features, y_numeric, warn_on_dtype, estimator)
    529         y = y.astype(np.float64)
    530 
--> 531     check_consistent_length(X, y)
    532 
    533     return X, y

C:\Users\philg\Anaconda2\lib\site-packages\sklearn\utils\validation.pyc in check_consistent_length(*arrays)
    179     if len(uniques) > 1:
    180         raise ValueError("Found input variables with inconsistent numbers of"
--> 181                          " samples: %r" % [int(l) for l in lengths])
    182 
    183 

ValueError: Found input variables with inconsistent numbers of samples: [14, 38]

根据我通过在管道中放置 Print 函数收集到的信息,问题集中在自定义转换器上。我用 self.y = y.

做了一些可能有点古怪的事情

据我所知,当 GridSearchCV 开始进行交叉验证时,会发生以下情况[示例]:

#Non-cv X.shape is (65,700)
#DataSelector is called 
X.shape = (38, 700) 
y.shape =(38L,)
#It passes to SelectModelAttrib the same shapes
X.shape = (38, 500)
y.shape = (38L,)
#DataSelector is called a second time
X.shape = (14, 700)
y.shape = (38L,)
#error occurs

这就是错误...有什么方法可以让 y 在此管道中使用 X 进行更新吗?如果我这样做并用 sklearn 上的实际 SelectFromModel class 替换我的自定义转换器,整个事情将 运行。他们是怎么做到的?我查看了他们的源代码,但它超出了我的范围。

我不敢相信我花了一整天试图弄清楚这个问题,只是在试图找出替代方案时偶然发现了答案。

答案是 here 感谢@Vivek Kumar 让我走上正轨。

本质上,将主要的 sklearn 转换器视为父级 class 并继承您的自定义 class。然后我 copy/pasted 来自源 here and from the source here 的代码,并添加了我想要的 extra-tidbits [attrib_number,toggle]。

例如,以下自定义 class 在前一个不可用的地方起作用(不再使用只起作用一次的 self.y)。

from sklearn.utils import check_X_y
from sklearn.feature_selection import f_classif
from sklearn.utils import check_array, safe_mask
from warnings import warn

class SelectFprCustom(SelectFpr):
    def __init__(self, score_func=f_classif, attrib_number=20, on=True):
        super(SelectFprCustom,self).__init__(alpha=0.05)
        self.score_func = score_func
        self.attrib_number = attrib_number
        self.on = on

    def fit(self, X, y):
        X, y = check_X_y(X, y, ['csr', 'csc'], multi_output=True)

        if not callable(self.score_func):
            raise TypeError("The score function should be a callable, %s (%s) "
                            "was passed."
                            % (self.score_func, type(self.score_func)))

        self._check_params(X, y)
        score_func_ret = self.score_func(X, y)
        if isinstance(score_func_ret, (list, tuple)):
            self.scores_, self.pvalues_ = score_func_ret
            self.pvalues_ = np.asarray(self.pvalues_)
        else:
            self.scores_ = score_func_ret
            self.pvalues_ = None

        self.scores_ = np.asarray(self.scores_)
        return self

    def transform(self, X):
        X = check_array(X, accept_sparse='csr')
        mask = self.get_support()
        if (not mask.any()) or (self.on==False):
            warn("No features were selected: either the data is"
                 " too noisy or the selection test too strict or you have on=False.",
                 UserWarning)
            return np.empty(0).reshape((X.shape[0], 0))
        if len(mask) != X.shape[1]:
            raise ValueError("X has a different shape than during fitting.")
        print (self.attrib_number)
        return X[:, safe_mask(X, mask)][:,:self.attrib_number]

    def _check_params(self, X, y):
        pass