如何在 python 中加速嵌套交叉验证?
How to speed up nested cross validation in python?
根据我的发现,还有 1 个类似的问题 (Speed-up nested cross-validation),但是在尝试了本网站和 Microsoft 上建议的几个修复程序后,安装 MPI 对我不起作用,所以我希望有是这个问题的另一个包或答案。
我希望比较多种算法和 gridsearch 的各种参数(也许参数太多?),除了 mpi4py 之外还有哪些方法可以加速 运行ning 我的代码?据我了解,我不能使用 n_jobs=-1 因为那不是嵌套的?
另请注意,我无法运行 对我试图在下面查看的许多参数进行此操作(运行 比我的时间还长)。如果我只给每个模型 2 个参数进行比较,则 2 小时后才会有结果。此外,我 运行 这个代码在一个包含 252 行和 25 个特征列的数据集上,有 4 个要预测的分类变量('certain'、'likely'、'possible' 或 'unknown' ) 一个基因(有 252 个基因)是否影响疾病。使用 SMOTE 将样本大小增加到 420,然后就可以使用了。
dataset= pd.read_csv('data.csv')
data = dataset.drop(["gene"],1)
df = data.iloc[:,0:24]
df = df.fillna(0)
X = MinMaxScaler().fit_transform(df)
le = preprocessing.LabelEncoder()
encoded_value = le.fit_transform(["certain", "likely", "possible", "unlikely"])
Y = le.fit_transform(data["category"])
sm = SMOTE(random_state=100)
X_res, y_res = sm.fit_resample(X, Y)
seed = 7
logreg = LogisticRegression(penalty='l1', solver='liblinear',multi_class='auto')
LR_par= {'penalty':['l1'], 'C': [0.5, 1, 5, 10], 'max_iter':[500, 1000, 5000]}
rfc =RandomForestClassifier()
param_grid = {'bootstrap': [True, False],
'max_depth': [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, None],
'max_features': ['auto', 'sqrt'],
'min_samples_leaf': [1, 2, 4,25],
'min_samples_split': [2, 5, 10, 25],
'n_estimators': [200, 400, 600, 800, 1000, 1200, 1400, 1600, 1800, 2000]}
mlp = MLPClassifier(random_state=seed)
parameter_space = {'hidden_layer_sizes': [(10,20), (10,20,10), (50,)],
'activation': ['tanh', 'relu'],
'solver': ['adam', 'sgd'],
'max_iter': [10000],
'alpha': [0.1, 0.01, 0.001],
'learning_rate': ['constant','adaptive']}
gbm = GradientBoostingClassifier(min_samples_split=25, min_samples_leaf=25)
param = {"loss":["deviance"],
"learning_rate": [0.15,0.1,0.05,0.01,0.005,0.001],
"min_samples_split": [2, 5, 10, 25],
"min_samples_leaf": [1, 2, 4,25],
"max_depth":[10, 20, 30, 40, 50, 60, 70, 80, 90, 100, None],
"max_features":['auto', 'sqrt'],
"criterion": ["friedman_mse"],
"n_estimators":[200, 400, 600, 800, 1000, 1200, 1400, 1600, 1800, 2000]
}
svm = SVC(gamma="scale", probability=True)
tuned_parameters = {'kernel':('linear', 'rbf'), 'C':(1,0.25,0.5,0.75)}
def baseline_model(optimizer='adam', learn_rate=0.01):
model = Sequential()
model.add(Dense(100, input_dim=X_res.shape[1], activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(50, activation='relu')) #8 is the dim/ the number of hidden units (units are the kernel)
model.add(Dense(4, activation='softmax'))
model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])
return model
keras = KerasClassifier(build_fn=baseline_model, batch_size=32, epochs=100, verbose=0)
learn_rate = [0.001, 0.01, 0.1, 0.2, 0.3]
optimizer = ['SGD', 'RMSprop', 'Adagrad', 'Adadelta', 'Adam', 'Adamax', 'Nadam']
kerasparams = dict(optimizer=optimizer, learn_rate=learn_rate)
inner_cv = KFold(n_splits=10, shuffle=True, random_state=seed)
outer_cv = KFold(n_splits=10, shuffle=True, random_state=seed)
models = []
models.append(('GBM', GridSearchCV(gbm, param, cv=inner_cv,iid=False, n_jobs=1)))
models.append(('RFC', GridSearchCV(rfc, param_grid, cv=inner_cv,iid=False, n_jobs=1)))
models.append(('LR', GridSearchCV(logreg, LR_par, cv=inner_cv, iid=False, n_jobs=1)))
models.append(('SVM', GridSearchCV(svm, tuned_parameters, cv=inner_cv, iid=False, n_jobs=1)))
models.append(('MLP', GridSearchCV(mlp, parameter_space, cv=inner_cv,iid=False, n_jobs=1)))
models.append(('Keras', GridSearchCV(estimator=keras, param_grid=kerasparams, cv=inner_cv,iid=False, n_jobs=1)))
results = []
names = []
scoring = 'accuracy'
X_train, X_test, Y_train, Y_test = train_test_split(X_res, y_res, test_size=0.2, random_state=0)
for name, model in models:
nested_cv_results = model_selection.cross_val_score(model, X_res, y_res, cv=outer_cv, scoring=scoring)
results.append(nested_cv_results)
names.append(name)
msg = "Nested CV Accuracy %s: %f (+/- %f )" % (name, nested_cv_results.mean()*100, nested_cv_results.std()*100)
print(msg)
model.fit(X_train, Y_train)
print('Test set accuracy: {:.2f}'.format(model.score(X_test, Y_test)*100), '%')
print("Best Parameters: \n{}\n".format(model.best_params_))
print("Best CV Score: \n{}\n".format(model.best_score_))
例如,大部分数据集都是二进制的,如下所示:
gene Tissue Druggable Eigenvalue CADDvalue Catalogpresence Category
ACE 1 1 1 0 1 Certain
ABO 1 0 0 0 0 Likely
TP53 1 1 0 0 0 Possible
如能提供任何有关我如何加快速度的指导,我们将不胜感激。
编辑:我也尝试过使用 dask 并行处理,但我不确定我做对了,而且它似乎 运行 没有任何更快:
for name, model in models:
with joblib.parallel_backend('dask'):
nested_cv_results = model_selection.cross_val_score(model, X_res, y_res, cv=outer_cv, scoring=scoring)
results.append(nested_cv_results)
names.append(name)
msg = "Nested CV Accuracy %s: %f (+/- %f )" % (name, nested_cv_results.mean()*100, nested_cv_results.std()*100)
print(msg)
model.fit(X_train, Y_train)
print('Test set accuracy: {:.2f}'.format(model.score(X_test, Y_test)*100), '%')
#print("Best Estimator: \n{}\n".format(model.best_estimator_))
print("Best Parameters: \n{}\n".format(model.best_params_))
print("Best CV Score: \n{}\n".format(model.best_score_)) #average of all cv folds for a single combination of the parameters you specify
编辑:还要注意减少网格搜索,我已经尝试过每个模型使用例如 5 个参数,但这仍然需要几个小时才能完成,所以虽然减少数量会有所帮助,如果有任何建议如果效率超出这个范围,我将不胜感激。
两件事:
申请后SMOTE
:
import umap
dim_reduced = umap.UMAP(
min_dist=min_dist,
n_neighbors=neighbours,
random_state=1234,
).fit_transform(smote_output)
然后您可以使用 dim_reduced
进行训练测试拆分。
降低维度将有助于消除数据中的噪声,而不是处理 25 个特征,您将把它们减少到 2 个(使用 UMAP)或您选择的组件数量(使用 PCA)。这应该对性能有重大影响。
Dask-ML 具有可扩展的实现 GridSearchCV
和 RandomSearchCV
,我相信它们可以替代 Scikit-Learn。它们是与 Scikit-Learn 开发人员一起开发的。
它们可以更快,原因有二:
- 他们避免在管道的不同阶段之间重复共享工作
- 它们可以扩展到任何可以部署 Dask 的集群 (which is easy on most cluster infrastructure)
在您的情况下有一个简单的胜利,那就是....开始使用并行处理:)。如果你有集群,dask
会帮助你(它会在单机上运行,但与 sklearn
中的默认调度相比改进并不显着),但如果你打算 运行 它在一台机器上(但有多个 cores/threads 和 "enough" 内存)然后你可以 运行 并行嵌套 CV。唯一的技巧是 sklearn
不允许您在多个进程中 运行 outer CV 循环。但是,它将允许您运行 多线程中的内部循环。
目前您在外部 CV 循环中有 n_jobs=None
(这是 cross_val_score
中的默认设置),这意味着 n_jobs=1
,这是您可以与 sklearn
在嵌套的 CV 中。
但是,您可以通过在您使用的所有 GridSearchCV
中设置 n_jobs=some_reasonable_number
来轻松获得收益。 some_reasonable_number
不一定是 -1
(但这是一个很好的起点)。一些算法要么在 n_jobs=n_cores
而不是 n_threads
上稳定(例如,xgboost
),要么已经具有内置的多处理(例如 RandomForestClassifier
),并且可能如果你生成太多进程会发生冲突。
IIUC,您正在尝试从 sklearn
文档中并行化 this example。如果是这种情况,那么这是一种可能的解决方法
why dask is not working
和
Any kind of constructive guidance or further knowledge on this problem
一般进口
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn import preprocessing
from imblearn.over_sampling import SMOTE
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.model_selection import cross_val_score, GridSearchCV, KFold, train_test_split
from sklearn.neural_network import MLPClassifier
import dask_ml.model_selection as dcv
import time
数据
- 我定义了 3 个数据集来尝试实现
dask_ml
- 第三个(数据集 3)的大小,行数是可调的,可以根据您的计算能力任意增加
- 我仅使用此数据集定时执行
dask_ml
- 下面的代码适用于所有 3 个数据集
- 数据集 1 是 SO 问题中示例数据的稍长版本
#### Dataset 1 - longer version of data in the question
d = """gene Tissue Druggable Eigenvalue CADDvalue Catalogpresence Category
ACE 1 1 1 0 1 Certain
ABO 1 0 0 0 0 Likely
TP53 1 1 0 0 0 Possible"""
data = pd.DataFrame([x.split(' ') for x in d.split('\n')])
data.columns = data.loc[0,:]
data.drop(0, axis=0, inplace=True)
data = pd.concat([data]*15)
data = data.drop(["gene"],1)
df = data.iloc[:,0:5]
X = MinMaxScaler().fit_transform(df)
le = preprocessing.LabelEncoder()
encoded_value = le.fit_transform(["Certain", "Likely", "Possible"])
Y = le.fit_transform(data["Category"])
sm = SMOTE(random_state=100)
X_res, y_res = sm.fit_resample(X, Y)
#### Dataset 2 - iris dataset from example in sklearn nested cross validation docs
# Load the dataset
from sklearn.datasets import load_iris
iris = load_iris()
X_res = iris.data
y_res = iris.target
#### Dataset 3 - size (#rows, #columns) is adjustable (I used this to time code execution)
X_res = pd.DataFrame(np.random.rand(300,50), columns=['col_'+str(c+1) for c in list(range(50))])
from random import shuffle
cats = ["paris", "barcelona", "kolkata", "new york", 'sydney']
y_values = cats*int(len(X_res)/len(cats))
shuffle(y_values)
y_res = pd.Series(y_values)
实例化分类器 - 问题中的代码没有变化
seed = 7
logreg = LogisticRegression(penalty='l1', solver='liblinear',multi_class='auto')
LR_par= {'penalty':['l1'], 'C': [0.5, 1, 5, 10], 'max_iter':[500, 1000, 5000]}
mlp = MLPClassifier(random_state=seed)
parameter_space = {'hidden_layer_sizes': [(10,20), (10,20,10), (50,)],
'activation': ['tanh', 'relu'],
'solver': ['adam', 'sgd'],
'max_iter': [10000],
'alpha': [0.1, 0.01, 0.001],
'learning_rate': ['constant','adaptive']}
rfc =RandomForestClassifier()
param_grid = {'bootstrap': [True, False],
'max_depth': [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, None],
'max_features': ['auto', 'sqrt'],
'min_samples_leaf': [1, 2, 4,25],
'min_samples_split': [2, 5, 10, 25],
'n_estimators': [200, 400, 600, 800, 1000, 1200, 1400, 1600, 1800, 2000]}
gbm = GradientBoostingClassifier(min_samples_split=25, min_samples_leaf=25)
param = {"loss":["deviance"],
"learning_rate": [0.15,0.1,0.05,0.01,0.005,0.001],
"min_samples_split": [2, 5, 10, 25],
"min_samples_leaf": [1, 2, 4,25],
"max_depth":[10, 20, 30, 40, 50, 60, 70, 80, 90, 100, None],
"max_features":['auto', 'sqrt'],
"criterion": ["friedman_mse"],
"n_estimators":[200, 400, 600, 800, 1000, 1200, 1400, 1600, 1800, 2000]
}
svm = SVC(gamma="scale", probability=True)
tuned_parameters = {'kernel':('linear', 'rbf'), 'C':(1,0.25,0.5,0.75)}
inner_cv = KFold(n_splits=10, shuffle=True, random_state=seed)
outer_cv = KFold(n_splits=10, shuffle=True, random_state=seed)
使用 GridSearchCV
由 dask_ml
实现(正如@MRocklin ) - see the dask_ml
docs for dask_ml.model_selection.GridSearchCV
最初建议的那样
- 为简洁起见,我将
KerasClassifier
和辅助函数 baseline_model()
排除在外,但我处理前者的方法与处理其他函数的方法相同
models = []
models.append(('MLP', dcv.GridSearchCV(mlp, parameter_space, cv=inner_cv,iid=False, n_jobs=1)))
models.append(('GBM', dcv.GridSearchCV(gbm, param, cv=inner_cv,iid=False, n_jobs=1)))
models.append(('RFC', dcv.GridSearchCV(rfc, param_grid, cv=inner_cv,iid=False, n_jobs=1)))
models.append(('LR', dcv.GridSearchCV(logreg, LR_par, cv=inner_cv, iid=False, n_jobs=1)))
models.append(('SVM', dcv.GridSearchCV(svm, tuned_parameters, cv=inner_cv, iid=False, n_jobs=1)))
初始化一个额外的空白列表来保存非嵌套的 CV 结果
non_nested_results = []
nested_results = []
names = []
scoring = 'accuracy'
X_train, X_test, Y_train, Y_test = train_test_split(X_res, y_res, test_size=0.2, random_state=0)
Joblib 和 dask
客户端设置
- 我在本地机器上创建了集群
# Create a local cluster
from dask.distributed import Client
client = Client(processes=False, threads_per_worker=4,
n_workers=1, memory_limit='6GB')
from sklearn.externals import joblib
执行嵌套 CV
- 先执行
GridSearchCV
- 第二次使用
cross_val_score
- 请注意,出于演示目的,我只使用了问题示例代码中模型列表中的 1 个
sklearn
模型 (SVC
)
start = time.time()
for name, model in [models[-1]]:
# Non_nested parameter search and scoring
with joblib.parallel_backend('dask'):
model.fit(X_train, Y_train)
non_nested_results.append(model.best_score_)
# Nested CV with parameter optimization
nested_score = cross_val_score(model, X=X_train, y=Y_train, cv=outer_cv)
nested_results.append(nested_score.mean())
names.append(name)
msg = "Nested CV Accuracy %s: %f (+/- %f )" %\
(name, np.mean(nested_results)*100, np.std(nested_results)*100)
print(msg)
print('Test set accuracy: {:.2f}'.format(model.score(X_test, Y_test)*100), '%')
print("Best Estimator: \n{}\n".format(model.best_estimator_))
print("Best Parameters: \n{}\n".format(model.best_params_))
print("Best CV Score: \n{}\n".format(model.best_score_))
score_difference = [a_i - b_i for a_i, b_i in zip(non_nested_results, nested_results)]
print("Average difference of {0:6f} with std. dev. of {1:6f}."
.format(np.mean(score_difference), np.std(score_difference)))
print('Total running time of the script: {:.2f} seconds' .format(time.time()-start))
client.close()
下面是使用数据集 3
的输出(带有脚本执行时间)
没有dask
1
的输出+时序
Nested CV Accuracy SVM: 20.416667 (+/- 0.000000 )
Test set accuracy: 16.67 %
Best Estimator:
SVC(C=0.75, cache_size=200, class_weight=None, coef0=0.0,
decision_function_shape='ovr', degree=3, gamma='scale', kernel='linear',
max_iter=-1, probability=True, random_state=None, shrinking=True,
tol=0.001, verbose=False)
Best Parameters:
{'C': 0.75, 'kernel': 'linear'}
Best CV Score:
0.2375
Average difference of 0.033333 with std. dev. of 0.000000.
Total running time of the script: 23.96 seconds
输出+定时dask
(使用n_workers=1
和threads_per_worker=4
)2
Nested CV Accuracy SVM: 18.750000 (+/- 0.000000 )
Test set accuracy: 13.33 %
Best Estimator:
SVC(C=0.5, cache_size=200, class_weight=None, coef0=0.0,
decision_function_shape='ovr', degree=3, gamma='scale', kernel='rbf',
max_iter=-1, probability=True, random_state=None, shrinking=True,
tol=0.001, verbose=False)
Best Parameters:
{'C': 0.5, 'kernel': 'rbf'}
Best CV Score:
0.1916666666666667
Average difference of 0.004167 with std. dev. of 0.000000.
Total running time of the script: 8.84 seconds
输出+定时dask
(使用n_workers=4
和threads_per_worker=4
)2
Nested CV Accuracy SVM: 23.333333 (+/- 0.000000 )
Test set accuracy: 21.67 %
Best Estimator:
SVC(C=0.25, cache_size=200, class_weight=None, coef0=0.0,
decision_function_shape='ovr', degree=3, gamma='scale', kernel='linear',
max_iter=-1, probability=True, random_state=None, shrinking=True,
tol=0.001, verbose=False)
Best Parameters:
{'C': 0.25, 'kernel': 'linear'}
Best CV Score:
0.25
Average difference of 0.016667 with std. dev. of 0.000000.
Total running time of the script: 7.52 seconds
输出+定时dask
(使用n_workers=1
和threads_per_worker=8
)2
Nested CV Accuracy SVM: 20.416667 (+/- 0.000000 )
Test set accuracy: 18.33 %
Best Estimator:
SVC(C=1, cache_size=200, class_weight=None, coef0=0.0,
decision_function_shape='ovr', degree=3, gamma='scale', kernel='rbf',
max_iter=-1, probability=True, random_state=None, shrinking=True,
tol=0.001, verbose=False)
Best Parameters:
{'C': 1, 'kernel': 'rbf'}
Best CV Score:
0.23333333333333334
Average difference of 0.029167 with std. dev. of 0.000000.
Total running time of the script: 7.06 seconds
1使用sklearn.model_selection.GridSearchCV()
不使用joblib()
2用dask_ml.model_selection.GridSearchCV()
代替sklearn.model_selection.GridSearchCV()
,用joblib()
关于此答案中的代码和输出的注释
- 我在你的问题中注意到,与文档中的示例相比,你的
sklearn.model_selection.GridSearchCV()
和 cross_val_score
顺序颠倒了
- 不确定这是否对您的问题影响太大,但我想我会提到它
- 我没有嵌套交叉验证的经验,所以我无法评论
Client(..., n_workers=n, threads_per_worker=m)
与 n>1
and/or m=4 or m=8
是否 acceptable/incorrect
关于 dask_ml
用法的一般评论(据我所知)
- Case 1: if the training data is small enough to fit into memory on a single machine, but the testing dataset does not fit into memory, you can use the wrapper
ParallelPostFit
- 将测试数据并行读取到集群上
- 使用集群上的所有 worker 并行对测试数据进行预测
- IIUC,这个案例与你的问题无关
- Case 2:如果您想使用
joblib
在集群上训练大型 scikit-learn
模型(但 training/testing 数据适合内存)- a.k.a.分布式 scikit-learn
- 然后您可以使用集群进行训练,骨架代码(根据 dask_ml
文档)如下所示
- IIUC这个案例是
- 与您的问题相关
- 我在这个答案中使用的方法
系统详细信息(用于执行代码)
dask==1.2.0
dask-ml==0.12.0
numpy==1.16.2+mkl
pandas==0.24.0
scikit-learn==0.20.3
sklearn==0.0
OS==Windows 8 (64-bit)
Python version (import platform; print(platform.python_version()))==3.7.2
根据我的发现,还有 1 个类似的问题 (Speed-up nested cross-validation),但是在尝试了本网站和 Microsoft 上建议的几个修复程序后,安装 MPI 对我不起作用,所以我希望有是这个问题的另一个包或答案。
我希望比较多种算法和 gridsearch 的各种参数(也许参数太多?),除了 mpi4py 之外还有哪些方法可以加速 运行ning 我的代码?据我了解,我不能使用 n_jobs=-1 因为那不是嵌套的?
另请注意,我无法运行 对我试图在下面查看的许多参数进行此操作(运行 比我的时间还长)。如果我只给每个模型 2 个参数进行比较,则 2 小时后才会有结果。此外,我 运行 这个代码在一个包含 252 行和 25 个特征列的数据集上,有 4 个要预测的分类变量('certain'、'likely'、'possible' 或 'unknown' ) 一个基因(有 252 个基因)是否影响疾病。使用 SMOTE 将样本大小增加到 420,然后就可以使用了。
dataset= pd.read_csv('data.csv')
data = dataset.drop(["gene"],1)
df = data.iloc[:,0:24]
df = df.fillna(0)
X = MinMaxScaler().fit_transform(df)
le = preprocessing.LabelEncoder()
encoded_value = le.fit_transform(["certain", "likely", "possible", "unlikely"])
Y = le.fit_transform(data["category"])
sm = SMOTE(random_state=100)
X_res, y_res = sm.fit_resample(X, Y)
seed = 7
logreg = LogisticRegression(penalty='l1', solver='liblinear',multi_class='auto')
LR_par= {'penalty':['l1'], 'C': [0.5, 1, 5, 10], 'max_iter':[500, 1000, 5000]}
rfc =RandomForestClassifier()
param_grid = {'bootstrap': [True, False],
'max_depth': [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, None],
'max_features': ['auto', 'sqrt'],
'min_samples_leaf': [1, 2, 4,25],
'min_samples_split': [2, 5, 10, 25],
'n_estimators': [200, 400, 600, 800, 1000, 1200, 1400, 1600, 1800, 2000]}
mlp = MLPClassifier(random_state=seed)
parameter_space = {'hidden_layer_sizes': [(10,20), (10,20,10), (50,)],
'activation': ['tanh', 'relu'],
'solver': ['adam', 'sgd'],
'max_iter': [10000],
'alpha': [0.1, 0.01, 0.001],
'learning_rate': ['constant','adaptive']}
gbm = GradientBoostingClassifier(min_samples_split=25, min_samples_leaf=25)
param = {"loss":["deviance"],
"learning_rate": [0.15,0.1,0.05,0.01,0.005,0.001],
"min_samples_split": [2, 5, 10, 25],
"min_samples_leaf": [1, 2, 4,25],
"max_depth":[10, 20, 30, 40, 50, 60, 70, 80, 90, 100, None],
"max_features":['auto', 'sqrt'],
"criterion": ["friedman_mse"],
"n_estimators":[200, 400, 600, 800, 1000, 1200, 1400, 1600, 1800, 2000]
}
svm = SVC(gamma="scale", probability=True)
tuned_parameters = {'kernel':('linear', 'rbf'), 'C':(1,0.25,0.5,0.75)}
def baseline_model(optimizer='adam', learn_rate=0.01):
model = Sequential()
model.add(Dense(100, input_dim=X_res.shape[1], activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(50, activation='relu')) #8 is the dim/ the number of hidden units (units are the kernel)
model.add(Dense(4, activation='softmax'))
model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])
return model
keras = KerasClassifier(build_fn=baseline_model, batch_size=32, epochs=100, verbose=0)
learn_rate = [0.001, 0.01, 0.1, 0.2, 0.3]
optimizer = ['SGD', 'RMSprop', 'Adagrad', 'Adadelta', 'Adam', 'Adamax', 'Nadam']
kerasparams = dict(optimizer=optimizer, learn_rate=learn_rate)
inner_cv = KFold(n_splits=10, shuffle=True, random_state=seed)
outer_cv = KFold(n_splits=10, shuffle=True, random_state=seed)
models = []
models.append(('GBM', GridSearchCV(gbm, param, cv=inner_cv,iid=False, n_jobs=1)))
models.append(('RFC', GridSearchCV(rfc, param_grid, cv=inner_cv,iid=False, n_jobs=1)))
models.append(('LR', GridSearchCV(logreg, LR_par, cv=inner_cv, iid=False, n_jobs=1)))
models.append(('SVM', GridSearchCV(svm, tuned_parameters, cv=inner_cv, iid=False, n_jobs=1)))
models.append(('MLP', GridSearchCV(mlp, parameter_space, cv=inner_cv,iid=False, n_jobs=1)))
models.append(('Keras', GridSearchCV(estimator=keras, param_grid=kerasparams, cv=inner_cv,iid=False, n_jobs=1)))
results = []
names = []
scoring = 'accuracy'
X_train, X_test, Y_train, Y_test = train_test_split(X_res, y_res, test_size=0.2, random_state=0)
for name, model in models:
nested_cv_results = model_selection.cross_val_score(model, X_res, y_res, cv=outer_cv, scoring=scoring)
results.append(nested_cv_results)
names.append(name)
msg = "Nested CV Accuracy %s: %f (+/- %f )" % (name, nested_cv_results.mean()*100, nested_cv_results.std()*100)
print(msg)
model.fit(X_train, Y_train)
print('Test set accuracy: {:.2f}'.format(model.score(X_test, Y_test)*100), '%')
print("Best Parameters: \n{}\n".format(model.best_params_))
print("Best CV Score: \n{}\n".format(model.best_score_))
例如,大部分数据集都是二进制的,如下所示:
gene Tissue Druggable Eigenvalue CADDvalue Catalogpresence Category
ACE 1 1 1 0 1 Certain
ABO 1 0 0 0 0 Likely
TP53 1 1 0 0 0 Possible
如能提供任何有关我如何加快速度的指导,我们将不胜感激。
编辑:我也尝试过使用 dask 并行处理,但我不确定我做对了,而且它似乎 运行 没有任何更快:
for name, model in models:
with joblib.parallel_backend('dask'):
nested_cv_results = model_selection.cross_val_score(model, X_res, y_res, cv=outer_cv, scoring=scoring)
results.append(nested_cv_results)
names.append(name)
msg = "Nested CV Accuracy %s: %f (+/- %f )" % (name, nested_cv_results.mean()*100, nested_cv_results.std()*100)
print(msg)
model.fit(X_train, Y_train)
print('Test set accuracy: {:.2f}'.format(model.score(X_test, Y_test)*100), '%')
#print("Best Estimator: \n{}\n".format(model.best_estimator_))
print("Best Parameters: \n{}\n".format(model.best_params_))
print("Best CV Score: \n{}\n".format(model.best_score_)) #average of all cv folds for a single combination of the parameters you specify
编辑:还要注意减少网格搜索,我已经尝试过每个模型使用例如 5 个参数,但这仍然需要几个小时才能完成,所以虽然减少数量会有所帮助,如果有任何建议如果效率超出这个范围,我将不胜感激。
两件事:
申请后SMOTE
:
import umap
dim_reduced = umap.UMAP(
min_dist=min_dist,
n_neighbors=neighbours,
random_state=1234,
).fit_transform(smote_output)
然后您可以使用 dim_reduced
进行训练测试拆分。
降低维度将有助于消除数据中的噪声,而不是处理 25 个特征,您将把它们减少到 2 个(使用 UMAP)或您选择的组件数量(使用 PCA)。这应该对性能有重大影响。
Dask-ML 具有可扩展的实现 GridSearchCV
和 RandomSearchCV
,我相信它们可以替代 Scikit-Learn。它们是与 Scikit-Learn 开发人员一起开发的。
它们可以更快,原因有二:
- 他们避免在管道的不同阶段之间重复共享工作
- 它们可以扩展到任何可以部署 Dask 的集群 (which is easy on most cluster infrastructure)
在您的情况下有一个简单的胜利,那就是....开始使用并行处理:)。如果你有集群,dask
会帮助你(它会在单机上运行,但与 sklearn
中的默认调度相比改进并不显着),但如果你打算 运行 它在一台机器上(但有多个 cores/threads 和 "enough" 内存)然后你可以 运行 并行嵌套 CV。唯一的技巧是 sklearn
不允许您在多个进程中 运行 outer CV 循环。但是,它将允许您运行 多线程中的内部循环。
目前您在外部 CV 循环中有 n_jobs=None
(这是 cross_val_score
中的默认设置),这意味着 n_jobs=1
,这是您可以与 sklearn
在嵌套的 CV 中。
但是,您可以通过在您使用的所有 GridSearchCV
中设置 n_jobs=some_reasonable_number
来轻松获得收益。 some_reasonable_number
不一定是 -1
(但这是一个很好的起点)。一些算法要么在 n_jobs=n_cores
而不是 n_threads
上稳定(例如,xgboost
),要么已经具有内置的多处理(例如 RandomForestClassifier
),并且可能如果你生成太多进程会发生冲突。
IIUC,您正在尝试从 sklearn
文档中并行化 this example。如果是这种情况,那么这是一种可能的解决方法
why dask is not working
和
Any kind of constructive guidance or further knowledge on this problem
一般进口
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn import preprocessing
from imblearn.over_sampling import SMOTE
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.model_selection import cross_val_score, GridSearchCV, KFold, train_test_split
from sklearn.neural_network import MLPClassifier
import dask_ml.model_selection as dcv
import time
数据
- 我定义了 3 个数据集来尝试实现
dask_ml
- 第三个(数据集 3)的大小,行数是可调的,可以根据您的计算能力任意增加
- 我仅使用此数据集定时执行
dask_ml
- 我仅使用此数据集定时执行
- 下面的代码适用于所有 3 个数据集
- 数据集 1 是 SO 问题中示例数据的稍长版本
- 第三个(数据集 3)的大小,行数是可调的,可以根据您的计算能力任意增加
#### Dataset 1 - longer version of data in the question
d = """gene Tissue Druggable Eigenvalue CADDvalue Catalogpresence Category
ACE 1 1 1 0 1 Certain
ABO 1 0 0 0 0 Likely
TP53 1 1 0 0 0 Possible"""
data = pd.DataFrame([x.split(' ') for x in d.split('\n')])
data.columns = data.loc[0,:]
data.drop(0, axis=0, inplace=True)
data = pd.concat([data]*15)
data = data.drop(["gene"],1)
df = data.iloc[:,0:5]
X = MinMaxScaler().fit_transform(df)
le = preprocessing.LabelEncoder()
encoded_value = le.fit_transform(["Certain", "Likely", "Possible"])
Y = le.fit_transform(data["Category"])
sm = SMOTE(random_state=100)
X_res, y_res = sm.fit_resample(X, Y)
#### Dataset 2 - iris dataset from example in sklearn nested cross validation docs
# Load the dataset
from sklearn.datasets import load_iris
iris = load_iris()
X_res = iris.data
y_res = iris.target
#### Dataset 3 - size (#rows, #columns) is adjustable (I used this to time code execution)
X_res = pd.DataFrame(np.random.rand(300,50), columns=['col_'+str(c+1) for c in list(range(50))])
from random import shuffle
cats = ["paris", "barcelona", "kolkata", "new york", 'sydney']
y_values = cats*int(len(X_res)/len(cats))
shuffle(y_values)
y_res = pd.Series(y_values)
实例化分类器 - 问题中的代码没有变化
seed = 7
logreg = LogisticRegression(penalty='l1', solver='liblinear',multi_class='auto')
LR_par= {'penalty':['l1'], 'C': [0.5, 1, 5, 10], 'max_iter':[500, 1000, 5000]}
mlp = MLPClassifier(random_state=seed)
parameter_space = {'hidden_layer_sizes': [(10,20), (10,20,10), (50,)],
'activation': ['tanh', 'relu'],
'solver': ['adam', 'sgd'],
'max_iter': [10000],
'alpha': [0.1, 0.01, 0.001],
'learning_rate': ['constant','adaptive']}
rfc =RandomForestClassifier()
param_grid = {'bootstrap': [True, False],
'max_depth': [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, None],
'max_features': ['auto', 'sqrt'],
'min_samples_leaf': [1, 2, 4,25],
'min_samples_split': [2, 5, 10, 25],
'n_estimators': [200, 400, 600, 800, 1000, 1200, 1400, 1600, 1800, 2000]}
gbm = GradientBoostingClassifier(min_samples_split=25, min_samples_leaf=25)
param = {"loss":["deviance"],
"learning_rate": [0.15,0.1,0.05,0.01,0.005,0.001],
"min_samples_split": [2, 5, 10, 25],
"min_samples_leaf": [1, 2, 4,25],
"max_depth":[10, 20, 30, 40, 50, 60, 70, 80, 90, 100, None],
"max_features":['auto', 'sqrt'],
"criterion": ["friedman_mse"],
"n_estimators":[200, 400, 600, 800, 1000, 1200, 1400, 1600, 1800, 2000]
}
svm = SVC(gamma="scale", probability=True)
tuned_parameters = {'kernel':('linear', 'rbf'), 'C':(1,0.25,0.5,0.75)}
inner_cv = KFold(n_splits=10, shuffle=True, random_state=seed)
outer_cv = KFold(n_splits=10, shuffle=True, random_state=seed)
使用 GridSearchCV
由 dask_ml
实现(正如@MRocklin dask_ml
docs for dask_ml.model_selection.GridSearchCV
- 为简洁起见,我将
KerasClassifier
和辅助函数baseline_model()
排除在外,但我处理前者的方法与处理其他函数的方法相同
models = []
models.append(('MLP', dcv.GridSearchCV(mlp, parameter_space, cv=inner_cv,iid=False, n_jobs=1)))
models.append(('GBM', dcv.GridSearchCV(gbm, param, cv=inner_cv,iid=False, n_jobs=1)))
models.append(('RFC', dcv.GridSearchCV(rfc, param_grid, cv=inner_cv,iid=False, n_jobs=1)))
models.append(('LR', dcv.GridSearchCV(logreg, LR_par, cv=inner_cv, iid=False, n_jobs=1)))
models.append(('SVM', dcv.GridSearchCV(svm, tuned_parameters, cv=inner_cv, iid=False, n_jobs=1)))
初始化一个额外的空白列表来保存非嵌套的 CV 结果
non_nested_results = []
nested_results = []
names = []
scoring = 'accuracy'
X_train, X_test, Y_train, Y_test = train_test_split(X_res, y_res, test_size=0.2, random_state=0)
Joblib 和 dask
客户端设置
- 我在本地机器上创建了集群
# Create a local cluster
from dask.distributed import Client
client = Client(processes=False, threads_per_worker=4,
n_workers=1, memory_limit='6GB')
from sklearn.externals import joblib
执行嵌套 CV
- 先执行
GridSearchCV
- 第二次使用
cross_val_score
- 请注意,出于演示目的,我只使用了问题示例代码中模型列表中的 1 个
sklearn
模型 (SVC
)
start = time.time()
for name, model in [models[-1]]:
# Non_nested parameter search and scoring
with joblib.parallel_backend('dask'):
model.fit(X_train, Y_train)
non_nested_results.append(model.best_score_)
# Nested CV with parameter optimization
nested_score = cross_val_score(model, X=X_train, y=Y_train, cv=outer_cv)
nested_results.append(nested_score.mean())
names.append(name)
msg = "Nested CV Accuracy %s: %f (+/- %f )" %\
(name, np.mean(nested_results)*100, np.std(nested_results)*100)
print(msg)
print('Test set accuracy: {:.2f}'.format(model.score(X_test, Y_test)*100), '%')
print("Best Estimator: \n{}\n".format(model.best_estimator_))
print("Best Parameters: \n{}\n".format(model.best_params_))
print("Best CV Score: \n{}\n".format(model.best_score_))
score_difference = [a_i - b_i for a_i, b_i in zip(non_nested_results, nested_results)]
print("Average difference of {0:6f} with std. dev. of {1:6f}."
.format(np.mean(score_difference), np.std(score_difference)))
print('Total running time of the script: {:.2f} seconds' .format(time.time()-start))
client.close()
下面是使用数据集 3
的输出(带有脚本执行时间)没有dask
1
Nested CV Accuracy SVM: 20.416667 (+/- 0.000000 )
Test set accuracy: 16.67 %
Best Estimator:
SVC(C=0.75, cache_size=200, class_weight=None, coef0=0.0,
decision_function_shape='ovr', degree=3, gamma='scale', kernel='linear',
max_iter=-1, probability=True, random_state=None, shrinking=True,
tol=0.001, verbose=False)
Best Parameters:
{'C': 0.75, 'kernel': 'linear'}
Best CV Score:
0.2375
Average difference of 0.033333 with std. dev. of 0.000000.
Total running time of the script: 23.96 seconds
输出+定时dask
(使用n_workers=1
和threads_per_worker=4
)2
Nested CV Accuracy SVM: 18.750000 (+/- 0.000000 )
Test set accuracy: 13.33 %
Best Estimator:
SVC(C=0.5, cache_size=200, class_weight=None, coef0=0.0,
decision_function_shape='ovr', degree=3, gamma='scale', kernel='rbf',
max_iter=-1, probability=True, random_state=None, shrinking=True,
tol=0.001, verbose=False)
Best Parameters:
{'C': 0.5, 'kernel': 'rbf'}
Best CV Score:
0.1916666666666667
Average difference of 0.004167 with std. dev. of 0.000000.
Total running time of the script: 8.84 seconds
输出+定时dask
(使用n_workers=4
和threads_per_worker=4
)2
Nested CV Accuracy SVM: 23.333333 (+/- 0.000000 )
Test set accuracy: 21.67 %
Best Estimator:
SVC(C=0.25, cache_size=200, class_weight=None, coef0=0.0,
decision_function_shape='ovr', degree=3, gamma='scale', kernel='linear',
max_iter=-1, probability=True, random_state=None, shrinking=True,
tol=0.001, verbose=False)
Best Parameters:
{'C': 0.25, 'kernel': 'linear'}
Best CV Score:
0.25
Average difference of 0.016667 with std. dev. of 0.000000.
Total running time of the script: 7.52 seconds
输出+定时dask
(使用n_workers=1
和threads_per_worker=8
)2
Nested CV Accuracy SVM: 20.416667 (+/- 0.000000 )
Test set accuracy: 18.33 %
Best Estimator:
SVC(C=1, cache_size=200, class_weight=None, coef0=0.0,
decision_function_shape='ovr', degree=3, gamma='scale', kernel='rbf',
max_iter=-1, probability=True, random_state=None, shrinking=True,
tol=0.001, verbose=False)
Best Parameters:
{'C': 1, 'kernel': 'rbf'}
Best CV Score:
0.23333333333333334
Average difference of 0.029167 with std. dev. of 0.000000.
Total running time of the script: 7.06 seconds
1使用sklearn.model_selection.GridSearchCV()
不使用joblib()
2用dask_ml.model_selection.GridSearchCV()
代替sklearn.model_selection.GridSearchCV()
,用joblib()
关于此答案中的代码和输出的注释
- 我在你的问题中注意到,与文档中的示例相比,你的
sklearn.model_selection.GridSearchCV()
和cross_val_score
顺序颠倒了- 不确定这是否对您的问题影响太大,但我想我会提到它
- 我没有嵌套交叉验证的经验,所以我无法评论
Client(..., n_workers=n, threads_per_worker=m)
与n>1
and/orm=4 or m=8
是否 acceptable/incorrect
关于 dask_ml
用法的一般评论(据我所知)
- Case 1: if the training data is small enough to fit into memory on a single machine, but the testing dataset does not fit into memory, you can use the wrapper
ParallelPostFit
- 将测试数据并行读取到集群上
- 使用集群上的所有 worker 并行对测试数据进行预测
- IIUC,这个案例与你的问题无关
- Case 2:如果您想使用
joblib
在集群上训练大型scikit-learn
模型(但 training/testing 数据适合内存)- a.k.a.分布式scikit-learn
- 然后您可以使用集群进行训练,骨架代码(根据dask_ml
文档)如下所示- IIUC这个案例是
- 与您的问题相关
- 我在这个答案中使用的方法
- IIUC这个案例是
系统详细信息(用于执行代码)
dask==1.2.0
dask-ml==0.12.0
numpy==1.16.2+mkl
pandas==0.24.0
scikit-learn==0.20.3
sklearn==0.0
OS==Windows 8 (64-bit)
Python version (import platform; print(platform.python_version()))==3.7.2