无法在 pyspark 数据帧上使用 Sklearn 模型进行预测
Unable to make prediction with Sklearn model on pyspark dataframe
我已成功加载 sklearn 模型,但无法对 pyspark 数据帧进行预测。当 运行 下面给出的代码时,出现下面提到的错误。请帮助我获取代码以在 pyspark 上使用 sklearn 模型进行预测。我也搜索了相关问题,但没有找到解决方案。
sc = spark.sparkContext
braodcast_model = sc.broadcast(loaded_model)
braodcast_model.value
#update prediction method
def predictor(cols):
#call predict method for model
return model.value.predict(*cols)
udf_predictor = udf(predictor, FloatType())
#apply the udf to dataframe
df_prediction = df.withColumn("prediction", udf_predictor(df.select(list_of_columns)))
我收到以下错误消息
TypeError: Invalid argument, not a string or column. For column literals, use 'lit', 'array',
'struct' or 'create_map' function.
我认为您在达到预期产出方面走在正确的轨道上。
我设法为此类问题找到了两种可能的解决方案:一种使用 Spark UDF,另一种使用 Pandas UDF.
Spark UDF
from pyspark.sql.functions import udf
@udf('integer')
def predict_udf(*cols):
return int(braodcast_model.value.predict((cols,)))
list_of_columns = df.columns
df_prediction = df.withColumn('prediction', predict_udf(*list_of_columns))
Pandas UDF
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf('integer')
def predict_pandas_udf(*cols):
X = pd.concat(cols, axis=1)
return pd.Series(braodcast_model.value.predict(X))
list_of_columns = df.columns
df_prediction = df.withColumn('prediction', predict_pandas_udf(*list_of_columns))
可重现的例子
在这里,我使用了带有 Spark 3.1.2
、pandas==1.2.4
和 pyarrow==4.0.0
的 Databricks Community 集群。
broadcasted_model
是 scikit-learn 的简单逻辑回归,在 breast cancer dataset.
上训练
import pandas as pd
import joblib
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from pyspark.sql.functions import udf, pandas_udf
# load dataset
X, y = load_breast_cancer(return_X_y=True, as_frame=True)
# split in training and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=28)
# create a small pipeline with standardization and model
pipe = make_pipeline(StandardScaler(), LogisticRegression())
# save and reload the model
path = '/databricks/driver/test_model.joblib'
joblib.dump(model, path)
loaded_model = joblib.load(path)
# sample of unseen data
df = spark.createDataFrame(X_test.sample(50, random_state=42))
# create broadcasted model
sc = spark.sparkContext
braodcast_model = sc.broadcast(loaded_model)
然后我使用了上面说明的两种方法,你会看到输出 df_prediction
在两种情况下都是相同的。
我已成功加载 sklearn 模型,但无法对 pyspark 数据帧进行预测。当 运行 下面给出的代码时,出现下面提到的错误。请帮助我获取代码以在 pyspark 上使用 sklearn 模型进行预测。我也搜索了相关问题,但没有找到解决方案。
sc = spark.sparkContext
braodcast_model = sc.broadcast(loaded_model)
braodcast_model.value
#update prediction method
def predictor(cols):
#call predict method for model
return model.value.predict(*cols)
udf_predictor = udf(predictor, FloatType())
#apply the udf to dataframe
df_prediction = df.withColumn("prediction", udf_predictor(df.select(list_of_columns)))
我收到以下错误消息
TypeError: Invalid argument, not a string or column. For column literals, use 'lit', 'array',
'struct' or 'create_map' function.
我认为您在达到预期产出方面走在正确的轨道上。
我设法为此类问题找到了两种可能的解决方案:一种使用 Spark UDF,另一种使用 Pandas UDF.
Spark UDF
from pyspark.sql.functions import udf
@udf('integer')
def predict_udf(*cols):
return int(braodcast_model.value.predict((cols,)))
list_of_columns = df.columns
df_prediction = df.withColumn('prediction', predict_udf(*list_of_columns))
Pandas UDF
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf('integer')
def predict_pandas_udf(*cols):
X = pd.concat(cols, axis=1)
return pd.Series(braodcast_model.value.predict(X))
list_of_columns = df.columns
df_prediction = df.withColumn('prediction', predict_pandas_udf(*list_of_columns))
可重现的例子
在这里,我使用了带有 Spark 3.1.2
、pandas==1.2.4
和 pyarrow==4.0.0
的 Databricks Community 集群。
broadcasted_model
是 scikit-learn 的简单逻辑回归,在 breast cancer dataset.
import pandas as pd
import joblib
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from pyspark.sql.functions import udf, pandas_udf
# load dataset
X, y = load_breast_cancer(return_X_y=True, as_frame=True)
# split in training and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=28)
# create a small pipeline with standardization and model
pipe = make_pipeline(StandardScaler(), LogisticRegression())
# save and reload the model
path = '/databricks/driver/test_model.joblib'
joblib.dump(model, path)
loaded_model = joblib.load(path)
# sample of unseen data
df = spark.createDataFrame(X_test.sample(50, random_state=42))
# create broadcasted model
sc = spark.sparkContext
braodcast_model = sc.broadcast(loaded_model)
然后我使用了上面说明的两种方法,你会看到输出 df_prediction
在两种情况下都是相同的。