在 sklearn 管道中对分类变量实施 KNN 插补

Implementing KNN imputation on categorical variables in an sklearn pipeline

我正在使用 sklearn 的管道转换器实现预处理管道。我的管道包括 sklearn 的 KNNImputer 估计器,我想用它来估算数据集中的分类特征。 (我的问题类似于这个线程,但它不包含我的问题的答案:

我知道必须在插补之前对分类特征进行编码,这就是我遇到麻烦的地方。使用标准 label/ordinal/onehot 编码器,当尝试对具有缺失值 (np.nan) 的分类特征进行编码时,您会收到以下错误:

ValueError: Input contains NaN

我设法通过创建一个自定义编码器来“绕过”它,我将 np.nan 替换为 'Missing':

class CustomEncoder(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.encoder = None

    def fit(self, X, y=None):
        self.encoder = OrdinalEncoder()
        return self.encoder.fit(X.fillna('Missing'))

    def transform(self, X, y=None):
        return self.encoder.transform(X.fillna('Missing'))

    def fit_transform(self, X, y=None, **fit_params):
        self.encoder = OrdinalEncoder()
        return self.encoder.fit_transform(X.fillna('Missing'))

preprocessor = ColumnTransformer([
    ('categoricals', CustomEncoder(), cat_features),
    ('numericals', StandardScaler(), num_features)],
    remainder='passthrough'
)

pipeline = Pipeline([
    ('preprocessing', preprocessor),
    ('imputing', KNNImputer(n_neighbors=5))
])

然而,在这种情况下,我找不到合理的方法在使用 KNNImputer 进行输入之前将编码的 'Missing' 值设置回 np.nan。

我读到我可以在这个线程上使用 OneHotEncoder 转换器手动执行此操作:,但同样,我想在管道中实现所有这些以自动化整个预处理阶段。

有人成功做到了吗?有人会推荐替代解决方案吗?使用 KNN 算法进行估算可能不值得麻烦,我应该改用简单的估算器吗?

提前感谢您的反馈!

恐怕这行不通。如果您对分类数据进行单热编码,您的缺失值将被编码到一个新的二进制变量中,并且 KNNImputer 将无法处理它们,因为:

  • 它一次作用于每一列,而不是作用于整套单热编码列
  • 不会再有遗漏需要处理

无论如何,您有几个选项可以使用 scikit-learn 来估算缺失的分类变量:

  1. 您可以使用 sklearn.impute.SimpleImputerstrategy="most_frequent":这将使用每列中出现频率最高的值替换缺失值,无论它们是字符串还是数字数据
  2. 使用 sklearn.impute.KNNImputer with some limitation: you have first to transform your categorical features into numeric ones while preserving the NaN values (see: ),那么您可以使用 KNNImputer 仅使用最近的邻居作为替换(如果您使用多个邻居,它将呈现一些毫无意义的平均值)。例如:
    import numpy as np
    import pandas as pd
    from sklearn.preprocessing import LabelEncoder
    from sklearn.impute import KNNImputer
    
    df = pd.DataFrame({'A': ['x', np.NaN, 'z'], 'B': [1, 6, 9], 'C': [2, 1, np.NaN]})
    
    df = df.apply(lambda series: pd.Series(
        LabelEncoder().fit_transform(series[series.notnull()]),
        index=series[series.notnull()].index
    ))
    
    imputer = KNNImputer(n_neighbors=1)
    imputer.fit_transform(df)
    
    In:
        A   B   C
    0   x   1   2.0
    1   NaN 6   1.0
    2   z   9   NaN
    
    Out:
    array([[0., 0., 1.],
           [0., 1., 0.],
           [1., 2., 0.]])
  1. 对混合数据使用 sklearn.impute.IterativeImputer and replicate a MissForest imputer(但您必须将数字特征与分类特征分开处理)。例如:
    import numpy as np
    import pandas as pd
    from sklearn.preprocessing import LabelEncoder
    from sklearn.experimental import enable_iterative_imputer
    from sklearn.impute import IterativeImputer
    from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
    
    df = pd.DataFrame({'A': ['x', np.NaN, 'z'], 'B': [1, 6, 9], 'C': [2, 1, np.NaN]})
    
    categorical = ['A']
    numerical = ['B', 'C']
    
    df[categorical] = df[categorical].apply(lambda series: pd.Series(
        LabelEncoder().fit_transform(series[series.notnull()]),
        index=series[series.notnull()].index
    ))
    
    print(df)
    
    imp_num = IterativeImputer(estimator=RandomForestRegressor(),
                               initial_strategy='mean',
                               max_iter=10, random_state=0)
    imp_cat = IterativeImputer(estimator=RandomForestClassifier(), 
                               initial_strategy='most_frequent',
                               max_iter=10, random_state=0)
    
    df[numerical] = imp_num.fit_transform(df[numerical])
    df[categorical] = imp_cat.fit_transform(df[categorical])
    
    print(df)

对于任何感兴趣的人,我设法实现了一个忽略 np.nan 并与 sklearn 管道转换器兼容的自定义标签编码器,类似于他在他的 github 存储库中实现的 Luca Massaron 的 LEncoder:https://github.com/lmassaron/deep_learning_for_tabular_data

class CustomEncoder(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.encoders = dict()

    def fit(self, X, y=None):
        for col in X.columns:
            le = LabelEncoder()
            le.fit(X.loc[X[col].notna(), col])
            le_dict = dict(zip(le.classes_, le.transform(le.classes_)))

            # Set unknown to new value so transform on test set handles unknown values
            max_value = max(le_dict.values())
            le_dict['_unk'] = max_value + 1

            self.encoders[col] = le_dict
        return self

    def transform(self, X, y=None):
        for col in X.columns:
            le_dict = self.encoders[col]
            X.loc[X[col].notna(), col] = X.loc[X[col].notna(), col].apply(
                lambda x: le_dict.get(x, le_dict['_unk'])).values
        return X

    def fit_transform(self, X, y=None, **fit_params):
        self.fit(X, y)
        return self.transform(X, y)