Python 并行化错误的多处理 - "function' object is not iterable"
Multiprocessing for Python parallelization error - "function' object is not iterable"
我们的数据中心有 NVIDIA Tesla K80 GPU 加速器计算,具有以下特点:Intel(R) Xeon(R) CPU E5-2670 v3 @2.30GHz, 48 CPU processors, 128GB RAM, 12 CPU cores
运行ning under Linux 64 -bit.
我运行正在编写以下代码,它在将不同的数据帧集垂直附加到 RandomForestRegressor
模型的单个数据系列后执行 GridSearchCV
。例如,我正在考虑的两个样本数据集位于 this link
from joblib import Parallel, delayed
import multiprocessing
import sys
import imp
import glob
import os
import pandas as pd
import math
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
import matplotlib
from sklearn.model_selection import cross_val_score
import matplotlib.pyplot as plt
import numpy as np
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.naive_bayes import GaussianNB
from sklearn.model_selection import GridSearchCV
from sklearn.naive_bayes import MultinomialNB
from sklearn.linear_model import LassoCV
from sklearn.metrics import r2_score, mean_squared_error, make_scorer
from sklearn.model_selection import train_test_split
from math import sqrt
from sklearn.cross_validation import train_test_split
df = pd.concat(map(pd.read_csv, glob.glob(os.path.join('', "cubic*.csv"))), ignore_index=True)
#df = pd.read_csv('cubic31.csv')
for i in range(1,3):
df['X_t'+str(i)] = df['X'].shift(i)
print(df)
df.dropna(inplace=True)
X = (pd.DataFrame({ 'X_%d'%i : df['X'].shift(i) for i in range(3)}).apply(np.nan_to_num, axis=0).values)
X = df.drop('Y', axis=1)
y = df['Y']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.40)
X_train = X_train.drop('time', axis=1)
X_test = X_test.drop('time', axis=1)
print(X.shape)
print(df['Y'].shape)
print()
print("Size of X_train:",(len(X_train)))
print("Size of Y_train:",(len(X_train)))
print("Size of X_test:",(len(X_test)))
print("Size of Y_test:",(len(y_test)))
print()
def gridSearchCVParallel():
#Fit models with some grid search CV=5 (not to low), use the best model
parameters = {'n_estimators': [10,30,100,500,1000]}
clf_rf = RandomForestRegressor(random_state=1)
clf = GridSearchCV(clf_rf, parameters, cv=5, scoring='neg_mean_squared_error')
model = clf.fit(X_train, y_train)
model.cv_results_['params'][model.best_index_]
math.sqrt(model.best_score_*-1)
model.grid_scores_
#####
print()
print(model.grid_scores_)
print("The best score: ",model.best_score_)
print("RMSE:",math.sqrt(model.best_score_*-1))
#reg = RandomForestRegressor(criterion='mse')
clf_rf.fit(X_train,y_train)
modelPrediction = clf_rf.predict(X_test)
print(modelPrediction)
print("Number of predictions:",len(modelPrediction))
meanSquaredError=mean_squared_error(y_test, modelPrediction)
print("Mean Square Error (MSE):", meanSquaredError)
rootMeanSquaredError = sqrt(meanSquaredError)
print("Root-Mean-Square Error (RMSE):", rootMeanSquaredError)
####### to add the trendline
fig, ax = plt.subplots()
#df.plot(x='time', y='Y', ax=ax)
ax.plot(df['time'].values, df['Y'].values)
fig, ax = plt.subplots()
index_values=range(0,len(y_test))
y_test.sort_index(inplace=True)
X_test.sort_index(inplace=True)
modelPred_test = clf_rf.predict(X_test)
ax.plot(pd.Series(index_values), y_test.values)
PlotInOne=pd.DataFrame(pd.concat([pd.Series(modelPred_test), pd.Series(y_test.values)], axis=1))
plt.figure(); PlotInOne.plot(); plt.legend(loc='best')
NumberOfCores = multiprocessing.cpu_count()
gridResults = Parallel(n_jobs=NumberOfCores)(delayed(gridSearchCVParallel))
print(gridResults)
当我最终 运行 这个程序用于一个巨大的数据集(大约 200 万行)时,GridSearchCV
花了 4 天多的时间。经过一番搜索,我发现 Python
线程可以使用多个 CPU 使用 concurrent.futures
或 multiprocessing
。正如我的代码中所示,我尝试使用 multiplrocessing
,但出现此错误 TypeError: 'function' object is not iterable
。这似乎是该函数应该将单个参数作为输入,我们传入一个可迭代对象作为参数。我该如何解决这个问题,以便利用多个 CPU 并在短时间内更快地完成任务?
提前谢谢你。
不要尝试自己并行化。 不要使用 joblib.Parallel
。无论如何,您将重新发明轮子,因为 GridSearchCV
是 已经并行化的 。只需传递 n_jobs
参数,默认为 1
,即默认使用单个作业。要利用多核架构,请传递 n_jobs = number_of_cores
,其中 number_of_cores
是您要使用的内核数。
如果您选中 source code, you'll see it basically wraps a call to joblib.Parallel,那么 n_jobs=-1
应该适用于 "all cores"。
我们的数据中心有 NVIDIA Tesla K80 GPU 加速器计算,具有以下特点:Intel(R) Xeon(R) CPU E5-2670 v3 @2.30GHz, 48 CPU processors, 128GB RAM, 12 CPU cores
运行ning under Linux 64 -bit.
我运行正在编写以下代码,它在将不同的数据帧集垂直附加到 RandomForestRegressor
模型的单个数据系列后执行 GridSearchCV
。例如,我正在考虑的两个样本数据集位于 this link
from joblib import Parallel, delayed
import multiprocessing
import sys
import imp
import glob
import os
import pandas as pd
import math
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
import matplotlib
from sklearn.model_selection import cross_val_score
import matplotlib.pyplot as plt
import numpy as np
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.naive_bayes import GaussianNB
from sklearn.model_selection import GridSearchCV
from sklearn.naive_bayes import MultinomialNB
from sklearn.linear_model import LassoCV
from sklearn.metrics import r2_score, mean_squared_error, make_scorer
from sklearn.model_selection import train_test_split
from math import sqrt
from sklearn.cross_validation import train_test_split
df = pd.concat(map(pd.read_csv, glob.glob(os.path.join('', "cubic*.csv"))), ignore_index=True)
#df = pd.read_csv('cubic31.csv')
for i in range(1,3):
df['X_t'+str(i)] = df['X'].shift(i)
print(df)
df.dropna(inplace=True)
X = (pd.DataFrame({ 'X_%d'%i : df['X'].shift(i) for i in range(3)}).apply(np.nan_to_num, axis=0).values)
X = df.drop('Y', axis=1)
y = df['Y']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.40)
X_train = X_train.drop('time', axis=1)
X_test = X_test.drop('time', axis=1)
print(X.shape)
print(df['Y'].shape)
print()
print("Size of X_train:",(len(X_train)))
print("Size of Y_train:",(len(X_train)))
print("Size of X_test:",(len(X_test)))
print("Size of Y_test:",(len(y_test)))
print()
def gridSearchCVParallel():
#Fit models with some grid search CV=5 (not to low), use the best model
parameters = {'n_estimators': [10,30,100,500,1000]}
clf_rf = RandomForestRegressor(random_state=1)
clf = GridSearchCV(clf_rf, parameters, cv=5, scoring='neg_mean_squared_error')
model = clf.fit(X_train, y_train)
model.cv_results_['params'][model.best_index_]
math.sqrt(model.best_score_*-1)
model.grid_scores_
#####
print()
print(model.grid_scores_)
print("The best score: ",model.best_score_)
print("RMSE:",math.sqrt(model.best_score_*-1))
#reg = RandomForestRegressor(criterion='mse')
clf_rf.fit(X_train,y_train)
modelPrediction = clf_rf.predict(X_test)
print(modelPrediction)
print("Number of predictions:",len(modelPrediction))
meanSquaredError=mean_squared_error(y_test, modelPrediction)
print("Mean Square Error (MSE):", meanSquaredError)
rootMeanSquaredError = sqrt(meanSquaredError)
print("Root-Mean-Square Error (RMSE):", rootMeanSquaredError)
####### to add the trendline
fig, ax = plt.subplots()
#df.plot(x='time', y='Y', ax=ax)
ax.plot(df['time'].values, df['Y'].values)
fig, ax = plt.subplots()
index_values=range(0,len(y_test))
y_test.sort_index(inplace=True)
X_test.sort_index(inplace=True)
modelPred_test = clf_rf.predict(X_test)
ax.plot(pd.Series(index_values), y_test.values)
PlotInOne=pd.DataFrame(pd.concat([pd.Series(modelPred_test), pd.Series(y_test.values)], axis=1))
plt.figure(); PlotInOne.plot(); plt.legend(loc='best')
NumberOfCores = multiprocessing.cpu_count()
gridResults = Parallel(n_jobs=NumberOfCores)(delayed(gridSearchCVParallel))
print(gridResults)
当我最终 运行 这个程序用于一个巨大的数据集(大约 200 万行)时,GridSearchCV
花了 4 天多的时间。经过一番搜索,我发现 Python
线程可以使用多个 CPU 使用 concurrent.futures
或 multiprocessing
。正如我的代码中所示,我尝试使用 multiplrocessing
,但出现此错误 TypeError: 'function' object is not iterable
。这似乎是该函数应该将单个参数作为输入,我们传入一个可迭代对象作为参数。我该如何解决这个问题,以便利用多个 CPU 并在短时间内更快地完成任务?
提前谢谢你。
不要尝试自己并行化。 不要使用 joblib.Parallel
。无论如何,您将重新发明轮子,因为 GridSearchCV
是 已经并行化的 。只需传递 n_jobs
参数,默认为 1
,即默认使用单个作业。要利用多核架构,请传递 n_jobs = number_of_cores
,其中 number_of_cores
是您要使用的内核数。
如果您选中 source code, you'll see it basically wraps a call to joblib.Parallel,那么 n_jobs=-1
应该适用于 "all cores"。