如何在 python 中加速嵌套交叉验证?

How to speed up nested cross validation in python?

根据我的发现,还有 1 个类似的问题 (Speed-up nested cross-validation),但是在尝试了本网站和 Microsoft 上建议的几个修复程序后,安装 MPI 对我不起作用,所以我希望有是这个问题的另一个包或答案。

我希望比较多种算法和 gridsearch 的各种参数(也许参数太多?),除了 mpi4py 之外还有哪些方法可以加速 运行ning 我的代码?据我了解,我不能使用 n_jobs=-1 因为那不是嵌套的?

另请注意,我无法运行 对我试图在下面查看的许多参数进行此操作(运行 比我的时间还长)。如果我只给每个模型 2 个参数进行比较,则 2 小时后才会有结果。此外,我 运行 这个代码在一个包含 252 行和 25 个特征列的数据集上,有 4 个要预测的分类变量('certain'、'likely'、'possible' 或 'unknown' ) 一个基因(有 252 个基因)是否影响疾病。使用 SMOTE 将样本大小增加到 420,然后就可以使用了。

dataset= pd.read_csv('data.csv')
data = dataset.drop(["gene"],1)
df = data.iloc[:,0:24]
df = df.fillna(0)
X = MinMaxScaler().fit_transform(df)

le = preprocessing.LabelEncoder()
encoded_value = le.fit_transform(["certain", "likely", "possible", "unlikely"])
Y = le.fit_transform(data["category"])

sm = SMOTE(random_state=100)
X_res, y_res = sm.fit_resample(X, Y)

seed = 7
logreg = LogisticRegression(penalty='l1', solver='liblinear',multi_class='auto')
LR_par= {'penalty':['l1'], 'C': [0.5, 1, 5, 10], 'max_iter':[500, 1000, 5000]}

rfc =RandomForestClassifier()
param_grid = {'bootstrap': [True, False],
              'max_depth': [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, None],
              'max_features': ['auto', 'sqrt'],
              'min_samples_leaf': [1, 2, 4,25],
              'min_samples_split': [2, 5, 10, 25],
              'n_estimators': [200, 400, 600, 800, 1000, 1200, 1400, 1600, 1800, 2000]}

mlp = MLPClassifier(random_state=seed)
parameter_space = {'hidden_layer_sizes': [(10,20), (10,20,10), (50,)],
     'activation': ['tanh', 'relu'],
     'solver': ['adam', 'sgd'],
     'max_iter': [10000],
     'alpha': [0.1, 0.01, 0.001],
     'learning_rate': ['constant','adaptive']}

gbm = GradientBoostingClassifier(min_samples_split=25, min_samples_leaf=25)
param = {"loss":["deviance"],
    "learning_rate": [0.15,0.1,0.05,0.01,0.005,0.001],
    "min_samples_split": [2, 5, 10, 25],
    "min_samples_leaf": [1, 2, 4,25],
    "max_depth":[10, 20, 30, 40, 50, 60, 70, 80, 90, 100, None],
    "max_features":['auto', 'sqrt'],
    "criterion": ["friedman_mse"],
    "n_estimators":[200, 400, 600, 800, 1000, 1200, 1400, 1600, 1800, 2000]
    }

svm = SVC(gamma="scale", probability=True)
tuned_parameters = {'kernel':('linear', 'rbf'), 'C':(1,0.25,0.5,0.75)}

def baseline_model(optimizer='adam', learn_rate=0.01):
    model = Sequential()
    model.add(Dense(100, input_dim=X_res.shape[1], activation='relu')) 
    model.add(Dropout(0.5))
    model.add(Dense(50, activation='relu')) #8 is the dim/ the number of hidden units (units are the kernel)
    model.add(Dense(4, activation='softmax'))
    model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])
    return model

keras = KerasClassifier(build_fn=baseline_model, batch_size=32, epochs=100, verbose=0)
learn_rate = [0.001, 0.01, 0.1, 0.2, 0.3]
optimizer = ['SGD', 'RMSprop', 'Adagrad', 'Adadelta', 'Adam', 'Adamax', 'Nadam']
kerasparams = dict(optimizer=optimizer, learn_rate=learn_rate)

inner_cv = KFold(n_splits=10, shuffle=True, random_state=seed)
outer_cv = KFold(n_splits=10, shuffle=True, random_state=seed)

models = []
models.append(('GBM', GridSearchCV(gbm, param, cv=inner_cv,iid=False, n_jobs=1)))
models.append(('RFC', GridSearchCV(rfc, param_grid, cv=inner_cv,iid=False, n_jobs=1)))
models.append(('LR', GridSearchCV(logreg, LR_par, cv=inner_cv, iid=False, n_jobs=1)))
models.append(('SVM', GridSearchCV(svm, tuned_parameters, cv=inner_cv, iid=False, n_jobs=1)))
models.append(('MLP', GridSearchCV(mlp, parameter_space, cv=inner_cv,iid=False, n_jobs=1)))
models.append(('Keras', GridSearchCV(estimator=keras, param_grid=kerasparams, cv=inner_cv,iid=False, n_jobs=1)))


results = []
names = []
scoring = 'accuracy'
X_train, X_test, Y_train, Y_test = train_test_split(X_res, y_res, test_size=0.2, random_state=0)


for name, model in models:
    nested_cv_results = model_selection.cross_val_score(model, X_res, y_res, cv=outer_cv, scoring=scoring)
    results.append(nested_cv_results)
    names.append(name)
    msg = "Nested CV Accuracy %s: %f (+/- %f )" % (name, nested_cv_results.mean()*100, nested_cv_results.std()*100)
    print(msg)
    model.fit(X_train, Y_train)
    print('Test set accuracy: {:.2f}'.format(model.score(X_test, Y_test)*100),  '%')
    print("Best Parameters: \n{}\n".format(model.best_params_))
    print("Best CV Score: \n{}\n".format(model.best_score_))

例如,大部分数据集都是二进制的,如下所示:

gene   Tissue    Druggable Eigenvalue CADDvalue Catalogpresence   Category
ACE      1           1         1          0           1            Certain
ABO      1           0         0          0           0            Likely
TP53     1           1         0          0           0            Possible

如能提供任何有关我如何加快速度的指导,我们将不胜感激。

编辑:我也尝试过使用 dask 并行处理,但我不确定我做对了,而且它似乎 运行 没有任何更快:

for name, model in models:
    with joblib.parallel_backend('dask'):
        nested_cv_results = model_selection.cross_val_score(model, X_res, y_res, cv=outer_cv, scoring=scoring)
        results.append(nested_cv_results)
        names.append(name)
        msg = "Nested CV Accuracy %s: %f (+/- %f )" % (name, nested_cv_results.mean()*100, nested_cv_results.std()*100)
        print(msg)
        model.fit(X_train, Y_train)
        print('Test set accuracy: {:.2f}'.format(model.score(X_test, Y_test)*100),  '%')
    #print("Best Estimator: \n{}\n".format(model.best_estimator_))
        print("Best Parameters: \n{}\n".format(model.best_params_))
        print("Best CV Score: \n{}\n".format(model.best_score_)) #average of all cv folds for a single combination of the parameters you specify 

编辑:还要注意减少网格搜索,我已经尝试过每个模型使用例如 5 个参数,但这仍然需要几个小时才能完成,所以虽然减少数量会有所帮助,如果有任何建议如果效率超出这个范围,我将不胜感激。

两件事:

  1. 尝试使用 HyperOpt 而不是 GridSearch - 它是用于串行和并行优化的 Python 库。

  2. 我会使用 UMAP or PCA 来降低维度。 UMAP 可能是更好的选择。

申请后SMOTE:

import umap

dim_reduced = umap.UMAP(
        min_dist=min_dist,
        n_neighbors=neighbours,
        random_state=1234,
    ).fit_transform(smote_output)

然后您可以使用 dim_reduced 进行训练测试拆分。

降低维度将有助于消除数据中的噪声,而不是处理 25 个特征,您将把它们减少到 2 个(使用 UMAP)或您选择的组件数量(使用 PCA)。这应该对性能有重大影响。

Dask-ML 具有可扩展的实现 GridSearchCVRandomSearchCV,我相信它们可以替代 Scikit-Learn。它们是与 Scikit-Learn 开发人员一起开发的。

它们可以更快,原因有二:

在您的情况下有一个简单的胜利,那就是....开始使用并行处理:)。如果你有集群,dask 会帮助你(它会在单机上运行,​​但与 sklearn 中的默认调度相比改进并不显着),但如果你打算 运行 它在一台机器上(但有多个 cores/threads 和 "enough" 内存)然后你可以 运行 并行嵌套 CV。唯一的技巧是 sklearn 不允许您在多个进程中 运行 outer CV 循环。但是,它将允许您运行 多线程中的内部循环

目前您在外部 CV 循环中有 n_jobs=None(这是 cross_val_score 中的默认设置),这意味着 n_jobs=1,这是您可以与 sklearn 在嵌套的 CV 中。

但是,您可以通过在您使用的所有 GridSearchCV 中设置 n_jobs=some_reasonable_number 来轻松获得收益。 some_reasonable_number 不一定是 -1(但这是一个很好的起点)。一些算法要么在 n_jobs=n_cores 而不是 n_threads 上稳定(例如,xgboost),要么已经具有内置的多处理(例如 RandomForestClassifier),并且可能如果你生成太多进程会发生冲突。

IIUC,您正在尝试从 sklearn 文档中并行化 this example。如果是这种情况,那么这是一种可能的解决方法

why dask is not working

Any kind of constructive guidance or further knowledge on this problem

一般进口

import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn import preprocessing
from imblearn.over_sampling import SMOTE
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.model_selection import cross_val_score, GridSearchCV, KFold, train_test_split
from sklearn.neural_network import MLPClassifier
import dask_ml.model_selection as dcv


import time

数据

  • 我定义了 3 个数据集来尝试实现 dask_ml
    • 第三个(数据集 3)的大小,行数是可调的,可以根据您的计算能力任意增加
      • 我仅使用此数据集定时执行 dask_ml
    • 下面的代码适用于所有 3 个数据集
    • 数据集 1 是 SO 问题中示例数据的稍长版本
#### Dataset 1 - longer version of data in the question
d = """gene Tissue Druggable Eigenvalue CADDvalue Catalogpresence Category
ACE 1 1 1 0 1 Certain
ABO 1 0 0 0 0 Likely
TP53 1 1 0 0 0 Possible"""
data = pd.DataFrame([x.split(' ') for x in d.split('\n')])
data.columns = data.loc[0,:]
data.drop(0, axis=0, inplace=True)
data = pd.concat([data]*15)

data = data.drop(["gene"],1)
df = data.iloc[:,0:5]

X = MinMaxScaler().fit_transform(df)
le = preprocessing.LabelEncoder()
encoded_value = le.fit_transform(["Certain", "Likely", "Possible"])
Y = le.fit_transform(data["Category"])

sm = SMOTE(random_state=100)
X_res, y_res = sm.fit_resample(X, Y)
#### Dataset 2 - iris dataset from example in sklearn nested cross validation docs
# Load the dataset
from sklearn.datasets import load_iris
iris = load_iris()
X_res = iris.data
y_res = iris.target
#### Dataset 3 - size (#rows, #columns) is adjustable (I used this to time code execution)
X_res = pd.DataFrame(np.random.rand(300,50), columns=['col_'+str(c+1) for c in list(range(50))])
from random import shuffle
cats = ["paris", "barcelona", "kolkata", "new york", 'sydney']
y_values = cats*int(len(X_res)/len(cats))
shuffle(y_values)
y_res = pd.Series(y_values)

实例化分类器 - 问题中的代码没有变化

seed = 7
logreg = LogisticRegression(penalty='l1', solver='liblinear',multi_class='auto')
LR_par= {'penalty':['l1'], 'C': [0.5, 1, 5, 10], 'max_iter':[500, 1000, 5000]}

mlp = MLPClassifier(random_state=seed)
parameter_space = {'hidden_layer_sizes': [(10,20), (10,20,10), (50,)],
     'activation': ['tanh', 'relu'],
     'solver': ['adam', 'sgd'],
     'max_iter': [10000],
     'alpha': [0.1, 0.01, 0.001],
     'learning_rate': ['constant','adaptive']}

rfc =RandomForestClassifier()
param_grid = {'bootstrap': [True, False],
              'max_depth': [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, None],
              'max_features': ['auto', 'sqrt'],
              'min_samples_leaf': [1, 2, 4,25],
              'min_samples_split': [2, 5, 10, 25],
              'n_estimators': [200, 400, 600, 800, 1000, 1200, 1400, 1600, 1800, 2000]}

gbm = GradientBoostingClassifier(min_samples_split=25, min_samples_leaf=25)
param = {"loss":["deviance"],
    "learning_rate": [0.15,0.1,0.05,0.01,0.005,0.001],
    "min_samples_split": [2, 5, 10, 25],
    "min_samples_leaf": [1, 2, 4,25],
    "max_depth":[10, 20, 30, 40, 50, 60, 70, 80, 90, 100, None],
    "max_features":['auto', 'sqrt'],
    "criterion": ["friedman_mse"],
    "n_estimators":[200, 400, 600, 800, 1000, 1200, 1400, 1600, 1800, 2000]
    }

svm = SVC(gamma="scale", probability=True)
tuned_parameters = {'kernel':('linear', 'rbf'), 'C':(1,0.25,0.5,0.75)}
inner_cv = KFold(n_splits=10, shuffle=True, random_state=seed)
outer_cv = KFold(n_splits=10, shuffle=True, random_state=seed)

使用 GridSearchCVdask_ml 实现(正如@MRocklin ) - see the dask_ml docs for dask_ml.model_selection.GridSearchCV

最初建议的那样
  • 为简洁起见,我将 KerasClassifier 和辅助函数 baseline_model() 排除在外,但我处理前者的方法与处理其他函数的方法相同
models = []
models.append(('MLP', dcv.GridSearchCV(mlp, parameter_space, cv=inner_cv,iid=False, n_jobs=1)))
models.append(('GBM', dcv.GridSearchCV(gbm, param, cv=inner_cv,iid=False, n_jobs=1)))
models.append(('RFC', dcv.GridSearchCV(rfc, param_grid, cv=inner_cv,iid=False, n_jobs=1)))
models.append(('LR', dcv.GridSearchCV(logreg, LR_par, cv=inner_cv, iid=False, n_jobs=1)))
models.append(('SVM', dcv.GridSearchCV(svm, tuned_parameters, cv=inner_cv, iid=False, n_jobs=1)))

初始化一个额外的空白列表来保存非嵌套的 CV 结果

non_nested_results = []
nested_results = []
names = []
scoring = 'accuracy'
X_train, X_test, Y_train, Y_test = train_test_split(X_res, y_res, test_size=0.2, random_state=0)

Joblib 和 dask 客户端设置

# Create a local cluster
from dask.distributed import Client
client = Client(processes=False, threads_per_worker=4,
        n_workers=1, memory_limit='6GB')
from sklearn.externals import joblib

根据 sklearn docs example

执行嵌套 CV
  • 先执行GridSearchCV
  • 第二次使用cross_val_score
  • 请注意,出于演示目的,我只使用了问题示例代码中模型列表中的 1 个 sklearn 模型 (SVC)
start = time.time()
for name, model in [models[-1]]:
  # Non_nested parameter search and scoring
  with joblib.parallel_backend('dask'):
    model.fit(X_train, Y_train)
  non_nested_results.append(model.best_score_)

  # Nested CV with parameter optimization
  nested_score = cross_val_score(model, X=X_train, y=Y_train, cv=outer_cv)
  nested_results.append(nested_score.mean())

  names.append(name)
  msg = "Nested CV Accuracy %s: %f (+/- %f )" %\
        (name, np.mean(nested_results)*100, np.std(nested_results)*100)
  print(msg)
  print('Test set accuracy: {:.2f}'.format(model.score(X_test, Y_test)*100),  '%')
  print("Best Estimator: \n{}\n".format(model.best_estimator_))
  print("Best Parameters: \n{}\n".format(model.best_params_))
  print("Best CV Score: \n{}\n".format(model.best_score_))

score_difference = [a_i - b_i for a_i, b_i in zip(non_nested_results, nested_results)]
print("Average difference of {0:6f} with std. dev. of {1:6f}."
      .format(np.mean(score_difference), np.std(score_difference)))

print('Total running time of the script: {:.2f} seconds' .format(time.time()-start))

client.close()

下面是使用数据集 3

的输出(带有脚本执行时间)

没有dask1

的输出+时序
Nested CV Accuracy SVM: 20.416667 (+/- 0.000000 )
Test set accuracy: 16.67 %
Best Estimator: 
SVC(C=0.75, cache_size=200, class_weight=None, coef0=0.0,
  decision_function_shape='ovr', degree=3, gamma='scale', kernel='linear',
  max_iter=-1, probability=True, random_state=None, shrinking=True,
  tol=0.001, verbose=False)

Best Parameters: 
{'C': 0.75, 'kernel': 'linear'}

Best CV Score: 
0.2375

Average difference of 0.033333 with std. dev. of 0.000000.
Total running time of the script: 23.96 seconds

输出+定时dask(使用n_workers=1threads_per_worker=42

Nested CV Accuracy SVM: 18.750000 (+/- 0.000000 )
Test set accuracy: 13.33 %
Best Estimator: 
SVC(C=0.5, cache_size=200, class_weight=None, coef0=0.0,
  decision_function_shape='ovr', degree=3, gamma='scale', kernel='rbf',
  max_iter=-1, probability=True, random_state=None, shrinking=True,
  tol=0.001, verbose=False)

Best Parameters: 
{'C': 0.5, 'kernel': 'rbf'}

Best CV Score: 
0.1916666666666667

Average difference of 0.004167 with std. dev. of 0.000000.
Total running time of the script: 8.84 seconds

输出+定时dask(使用n_workers=4threads_per_worker=42

Nested CV Accuracy SVM: 23.333333 (+/- 0.000000 )
Test set accuracy: 21.67 %
Best Estimator: 
SVC(C=0.25, cache_size=200, class_weight=None, coef0=0.0,
  decision_function_shape='ovr', degree=3, gamma='scale', kernel='linear',
  max_iter=-1, probability=True, random_state=None, shrinking=True,
  tol=0.001, verbose=False)

Best Parameters: 
{'C': 0.25, 'kernel': 'linear'}

Best CV Score: 
0.25

Average difference of 0.016667 with std. dev. of 0.000000.
Total running time of the script: 7.52 seconds

输出+定时dask(使用n_workers=1threads_per_worker=82

Nested CV Accuracy SVM: 20.416667 (+/- 0.000000 )
Test set accuracy: 18.33 %
Best Estimator: 
SVC(C=1, cache_size=200, class_weight=None, coef0=0.0,
  decision_function_shape='ovr', degree=3, gamma='scale', kernel='rbf',
  max_iter=-1, probability=True, random_state=None, shrinking=True,
  tol=0.001, verbose=False)

Best Parameters: 
{'C': 1, 'kernel': 'rbf'}

Best CV Score: 
0.23333333333333334

Average difference of 0.029167 with std. dev. of 0.000000.
Total running time of the script: 7.06 seconds

1使用sklearn.model_selection.GridSearchCV()不使用joblib()

2dask_ml.model_selection.GridSearchCV()代替sklearn.model_selection.GridSearchCV(),用joblib()

关于此答案中的代码和输出的注释

  • 我在你的问题中注意到,与文档中的示例相比,你的 sklearn.model_selection.GridSearchCV()cross_val_score 顺序颠倒了
    • 不确定这是否对您的问题影响太大,但我想我会提到它
  • 我没有嵌套交叉验证的经验,所以我无法评论 Client(..., n_workers=n, threads_per_worker=m)n>1 and/or m=4 or m=8 是否 acceptable/incorrect

关于 dask_ml 用法的一般评论(据我所知)

  • Case 1: if the training data is small enough to fit into memory on a single machine, but the testing dataset does not fit into memory, you can use the wrapper ParallelPostFit
    • 将测试数据并行读取到集群上
    • 使用集群上的所有 worker 并行对测试数据进行预测
    • IIUC,这个案例与你的问题无关
  • Case 2:如果您想使用 joblib 在集群上训练大型 scikit-learn 模型(但 training/testing 数据适合内存)- a.k.a.分布式 scikit-learn - 然后您可以使用集群进行训练,骨架代码(根据 dask_ml 文档)如下所示
    • IIUC这个案例是
      • 与您的问题相关
      • 我在这个答案中使用的方法

系统详细信息(用于执行代码)

dask==1.2.0
dask-ml==0.12.0
numpy==1.16.2+mkl
pandas==0.24.0
scikit-learn==0.20.3
sklearn==0.0
OS==Windows 8 (64-bit)
Python version (import platform; print(platform.python_version()))==3.7.2