如何将 joblib 与 scikit 一起使用学习并行交叉验证
how to use joblib with scikitlearn to crossvalidate in parallel
我正在尝试与 python 中的 joblib 库并行执行交叉验证折叠。
我有以下示例代码:
from sklearn.model_selection import KFold
import numpy as np
from sklearn.metrics import classification_report, confusion_matrix, f1_score
from sklearn import svm
from sklearn import datasets
from sklearn.model_selection import StratifiedKFold
from sklearn.svm import LinearSVC
iris = datasets.load_iris()
X, Y = iris.data, iris.target
skf = StratifiedKFold(n_splits=5)
#clf = svm.LinearSVC()
clf = svm.SVC(kernel='rbf')
#clf = svm.SVC(kernel='linear')
f1_list = []
for train_index, test_index in skf.split(X, Y):
X_train, X_test = X[train_index], X[test_index]
y_train, y_test = Y[train_index], Y[test_index]
clf.fit(X_train, y_train)
Y_predict = clf.predict(X_test)
f1 = f1_score(y_test, Y_predict, average='weighted')
print(f1)
conf_mat = confusion_matrix(y_test, Y_predict)
print(conf_mat)
f1_list.append(f1)
print(f1_list)
我想并行执行 for 循环以获得每个折叠的并行准确度分数。
我认为 joblib 库必须按以下方式使用:
from math import sqrt
from joblib import Parallel, delayed
def producer():
for i in range(6):
print('Produced %s' % i)
yield i
out = Parallel(n_jobs=2, verbose=100, pre_dispatch='1.5*n_jobs')(
delayed(sqrt)(i) for i in producer())
关于如何完成并行任务集成有什么建议吗?
在 Parallel
构造函数中,您使用 delayed
参数来指定要并行 运行 的函数。 delayed
returns 一个包装您的函数的新函数。然后,您可以使用将传递给原始函数的参数调用新包装的函数。
在您的示例中,sqrt
函数由 delayed
包装,然后从 range(6)
并行发送 i
。
我们需要做的是传递 delayed
一个可以在一大块数据上进行训练的函数,然后将 kfold 拆分的索引传递给这个新包装的函数。这是一个这样做的例子:
from sklearn.model_selection import KFold
import numpy as np
from sklearn.metrics import classification_report, confusion_matrix, f1_score
from sklearn import svm
from sklearn import datasets
from sklearn.model_selection import StratifiedKFold
from sklearn.svm import LinearSVC
from joblib import Parallel, delayed
iris = datasets.load_iris()
X, Y = iris.data, iris.target
skf = StratifiedKFold(n_splits=5)
clf = svm.SVC(kernel='rbf')
def train(train_index, test_index):
X_train, X_test = X[train_index], X[test_index]
y_train, y_test = Y[train_index], Y[test_index]
clf.fit(X_train, y_train)
Y_predict = clf.predict(X_test)
f1 = f1_score(y_test, Y_predict, average='weighted')
conf_mat = confusion_matrix(y_test, Y_predict)
return dict(f1=f1, conf_mat=conf_mat)
out = Parallel(n_jobs=2, verbose=100, pre_dispatch='1.5*n_jobs')(
delayed(train)(train_index, test_index) for train_index, test_index in skf.split(X, Y))
f1_scores = [d['f1'] for d in out]
conf_mats = [d['conf_mat'] for d in out]
print('f1_scores:', f1_scores)
print('confusion matrices:', conf_mats)
输出:
f1_scores: [0.9665831244778613, 1.0, 0.9665831244778613, 0.9665831244778613, 1.0]
confusion matrices: [array([[10, 0, 0],
[ 0, 10, 0],
[ 0, 1, 9]], dtype=int64), array([[10, 0, 0],
[ 0, 10, 0],
[ 0, 0, 10]], dtype=int64), array([[10, 0, 0],
[ 0, 9, 1],
[ 0, 0, 10]], dtype=int64), array([[10, 0, 0],
[ 0, 9, 1],
[ 0, 0, 10]], dtype=int64), array([[10, 0, 0],
[ 0, 10, 0],
[ 0, 0, 10]], dtype=int64)]
out
包含从 train
函数返回的指标,因此如果需要,我们可以单独拆分出 f1 分数和混淆矩阵。
我正在尝试与 python 中的 joblib 库并行执行交叉验证折叠。
我有以下示例代码:
from sklearn.model_selection import KFold
import numpy as np
from sklearn.metrics import classification_report, confusion_matrix, f1_score
from sklearn import svm
from sklearn import datasets
from sklearn.model_selection import StratifiedKFold
from sklearn.svm import LinearSVC
iris = datasets.load_iris()
X, Y = iris.data, iris.target
skf = StratifiedKFold(n_splits=5)
#clf = svm.LinearSVC()
clf = svm.SVC(kernel='rbf')
#clf = svm.SVC(kernel='linear')
f1_list = []
for train_index, test_index in skf.split(X, Y):
X_train, X_test = X[train_index], X[test_index]
y_train, y_test = Y[train_index], Y[test_index]
clf.fit(X_train, y_train)
Y_predict = clf.predict(X_test)
f1 = f1_score(y_test, Y_predict, average='weighted')
print(f1)
conf_mat = confusion_matrix(y_test, Y_predict)
print(conf_mat)
f1_list.append(f1)
print(f1_list)
我想并行执行 for 循环以获得每个折叠的并行准确度分数。
我认为 joblib 库必须按以下方式使用:
from math import sqrt
from joblib import Parallel, delayed
def producer():
for i in range(6):
print('Produced %s' % i)
yield i
out = Parallel(n_jobs=2, verbose=100, pre_dispatch='1.5*n_jobs')(
delayed(sqrt)(i) for i in producer())
关于如何完成并行任务集成有什么建议吗?
在 Parallel
构造函数中,您使用 delayed
参数来指定要并行 运行 的函数。 delayed
returns 一个包装您的函数的新函数。然后,您可以使用将传递给原始函数的参数调用新包装的函数。
在您的示例中,sqrt
函数由 delayed
包装,然后从 range(6)
并行发送 i
。
我们需要做的是传递 delayed
一个可以在一大块数据上进行训练的函数,然后将 kfold 拆分的索引传递给这个新包装的函数。这是一个这样做的例子:
from sklearn.model_selection import KFold
import numpy as np
from sklearn.metrics import classification_report, confusion_matrix, f1_score
from sklearn import svm
from sklearn import datasets
from sklearn.model_selection import StratifiedKFold
from sklearn.svm import LinearSVC
from joblib import Parallel, delayed
iris = datasets.load_iris()
X, Y = iris.data, iris.target
skf = StratifiedKFold(n_splits=5)
clf = svm.SVC(kernel='rbf')
def train(train_index, test_index):
X_train, X_test = X[train_index], X[test_index]
y_train, y_test = Y[train_index], Y[test_index]
clf.fit(X_train, y_train)
Y_predict = clf.predict(X_test)
f1 = f1_score(y_test, Y_predict, average='weighted')
conf_mat = confusion_matrix(y_test, Y_predict)
return dict(f1=f1, conf_mat=conf_mat)
out = Parallel(n_jobs=2, verbose=100, pre_dispatch='1.5*n_jobs')(
delayed(train)(train_index, test_index) for train_index, test_index in skf.split(X, Y))
f1_scores = [d['f1'] for d in out]
conf_mats = [d['conf_mat'] for d in out]
print('f1_scores:', f1_scores)
print('confusion matrices:', conf_mats)
输出:
f1_scores: [0.9665831244778613, 1.0, 0.9665831244778613, 0.9665831244778613, 1.0]
confusion matrices: [array([[10, 0, 0],
[ 0, 10, 0],
[ 0, 1, 9]], dtype=int64), array([[10, 0, 0],
[ 0, 10, 0],
[ 0, 0, 10]], dtype=int64), array([[10, 0, 0],
[ 0, 9, 1],
[ 0, 0, 10]], dtype=int64), array([[10, 0, 0],
[ 0, 9, 1],
[ 0, 0, 10]], dtype=int64), array([[10, 0, 0],
[ 0, 10, 0],
[ 0, 0, 10]], dtype=int64)]
out
包含从 train
函数返回的指标,因此如果需要,我们可以单独拆分出 f1 分数和混淆矩阵。