Pyspark ML - 如何保存管道和 RandomForestClassificationModel
Pyspark ML - How to save pipeline and RandomForestClassificationModel
我无法保存使用 python/spark 的 ml 包生成的随机森林模型。
>>> rf = RandomForestClassifier(labelCol="label", featuresCol="features")
>>> pipeline = Pipeline(stages=early_stages + [rf])
>>> model = pipeline.fit(trainingData)
>>> model.save("fittedpipeline")
Traceback (most recent call last): File "", line 1, in
AttributeError: 'PipelineModel' object has no attribute
'save'
>>> rfModel = model.stages[8]
>>> print(rfModel)
RandomForestClassificationModel (uid=rfc_46c07f6d7ac8) 有 20 棵树
>> rfModel.save("rfmodel")
Traceback (most recent call last): File "", line 1, in
AttributeError: 'RandomForestClassificationModel' object has
no attribute 'save'**
也尝试通过传递 'sc' 作为保存方法的第一个参数。
您的代码的主要问题是您使用的是 2.0.0 之前的 Apache Spark 版本。因此,save
尚不适用于 Pipeline
API。
这是从官方文档中合成的完整示例。让我们先创建管道:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
label_indexer = StringIndexer(inputCol="label", outputCol="indexedLabel")
labels = label_indexer.fit(data).labels
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
feature_indexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4)
early_stages = [label_indexer, feature_indexer]
# Split the data into training and test sets (30% held out for testing)
(train, test) = data.randomSplit([0.7, 0.3])
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)
# Convert indexed labels back to original labels.
label_converter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labels)
# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=early_stages + [rf, label_converter])
# Train model. This also runs the indexers.
model = pipeline.fit(train)
您现在可以保存管道:
>>> model.save("/tmp/rf")
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
您还可以保存射频模型:
>>> rf_model = model.stages[2]
>>> print(rf_model)
RandomForestClassificationModel (uid=rfc_b368678f4122) with 10 trees
>>> rf_model.save("/tmp/rf_2")
您可以保存管道和模型。在加载这些模型的情况下,您需要先验地知道每个模型对应的模型类型。例如:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer, OneHotEncoderEstimator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
df = *YOUR DATAFRAME*
categoricalColumns = ["A", "B", "C"]
stages = []
for categoricalCol in categoricalColumns:
stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
stages += [stringIndexer, encoder]
label_stringIdx = StringIndexer(inputCol="id_imp", outputCol="label")
stages += [label_stringIdx]
assemblerInputs = [c + "classVec" for c in categoricalColumns]
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df)
pipelineModel.save("/path")
在前面的案例中,我保存了一个具有不同阶段的管道。
pipelineModel.save("/路径")
现在,如果你想使用它们:
pipelineModel = Pipeline.load("/path")
df = pipelineModel.transform(df)
您可以对其他情况执行相同的操作,例如:
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=2)
(trainingData, testData) = df.randomSplit([0.7, 0.3], seed=100)
cvModel = cv.fit(trainingData)
cvModel.save("/path")
cvM = CrossValidatorModel.load("/path")
predictions2 = cvM.transform(testData)
predictions = cvModel.transform(testData)
简而言之,如果要加载模型,则需要使用相应的对象。
我无法保存使用 python/spark 的 ml 包生成的随机森林模型。
>>> rf = RandomForestClassifier(labelCol="label", featuresCol="features")
>>> pipeline = Pipeline(stages=early_stages + [rf])
>>> model = pipeline.fit(trainingData)
>>> model.save("fittedpipeline")
Traceback (most recent call last): File "", line 1, in AttributeError: 'PipelineModel' object has no attribute 'save'
>>> rfModel = model.stages[8]
>>> print(rfModel)
RandomForestClassificationModel (uid=rfc_46c07f6d7ac8) 有 20 棵树
>> rfModel.save("rfmodel")
Traceback (most recent call last): File "", line 1, in AttributeError: 'RandomForestClassificationModel' object has no attribute 'save'**
也尝试通过传递 'sc' 作为保存方法的第一个参数。
您的代码的主要问题是您使用的是 2.0.0 之前的 Apache Spark 版本。因此,save
尚不适用于 Pipeline
API。
这是从官方文档中合成的完整示例。让我们先创建管道:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
label_indexer = StringIndexer(inputCol="label", outputCol="indexedLabel")
labels = label_indexer.fit(data).labels
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
feature_indexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4)
early_stages = [label_indexer, feature_indexer]
# Split the data into training and test sets (30% held out for testing)
(train, test) = data.randomSplit([0.7, 0.3])
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)
# Convert indexed labels back to original labels.
label_converter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labels)
# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=early_stages + [rf, label_converter])
# Train model. This also runs the indexers.
model = pipeline.fit(train)
您现在可以保存管道:
>>> model.save("/tmp/rf")
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
您还可以保存射频模型:
>>> rf_model = model.stages[2]
>>> print(rf_model)
RandomForestClassificationModel (uid=rfc_b368678f4122) with 10 trees
>>> rf_model.save("/tmp/rf_2")
您可以保存管道和模型。在加载这些模型的情况下,您需要先验地知道每个模型对应的模型类型。例如:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer, OneHotEncoderEstimator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
df = *YOUR DATAFRAME*
categoricalColumns = ["A", "B", "C"]
stages = []
for categoricalCol in categoricalColumns:
stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
stages += [stringIndexer, encoder]
label_stringIdx = StringIndexer(inputCol="id_imp", outputCol="label")
stages += [label_stringIdx]
assemblerInputs = [c + "classVec" for c in categoricalColumns]
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df)
pipelineModel.save("/path")
在前面的案例中,我保存了一个具有不同阶段的管道。 pipelineModel.save("/路径")
现在,如果你想使用它们:
pipelineModel = Pipeline.load("/path")
df = pipelineModel.transform(df)
您可以对其他情况执行相同的操作,例如:
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=2)
(trainingData, testData) = df.randomSplit([0.7, 0.3], seed=100)
cvModel = cv.fit(trainingData)
cvModel.save("/path")
cvM = CrossValidatorModel.load("/path")
predictions2 = cvM.transform(testData)
predictions = cvModel.transform(testData)
简而言之,如果要加载模型,则需要使用相应的对象。