使用 Pipeline 基于分区 DataFrame 创建许多 Spark MLlib 模型
Create many Spark MLlib models based on partitioned DataFrame using Pipeline
scala> spark.version
res8: String = 2.2.0
我正在使用包含列 locationID
的 spark Dataframe。我已经创建了一个 MLlib 管道来构建线性回归模型,当我为它提供单个 locationID
的数据时它可以工作。我现在想为每个 'locationID' 创建许多模型(生产中可能有几千个 locationID)。我想保存每个模型的模型系数。
我不确定如何在 Scala 中完成此操作。
我的管道是这样定义的:
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.regression.LinearRegressionModel
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql
// Load the regression input data
val mydata = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("./inputdata.csv")
// Crate month one hot encoding
val monthIndexer = new StringIndexer()
.setInputCol("month")
.setOutputCol("monthIndex").fit(mydata)
val monthEncoder = new OneHotEncoder()
.setInputCol(monthIndexer.getOutputCol)
.setOutputCol("monthVec")
val assembler = new VectorAssembler()
.setInputCols(Array("monthVec","tran_adr"))
.setOutputCol("features")
val lr = new LinearRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
val pipeline = new Pipeline()
.setStages(Array(monthIndexer, monthEncoder, assembler, lr))
// Fit using the model pipeline
val myPipelineModel = pipeline.fit(mydata)
然后我可以像这样提取模型详细信息:
val modelExtract = myPipelineModel.stages(3).asInstanceOf[LinearRegressionModel]
println(s"Coefficients: ${modelExtract.coefficients} Intercept: ${modelExtract.intercept}")
// Summarize the model over the training set and print out some metrics
val trainingSummary = modelExtract.summary
println(s"numIterations: ${trainingSummary.totalIterations}")
println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]")
trainingSummary.residuals.show()
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
println(s"r2: ${trainingSummary.r2}")
现在我想对在 mydata
中找到的列 locationID
和每个数据分区上的管道 运行 进行分组。
我试过使用 groupby 但我只能聚合。
val grouped = mydata.groupBy("locationID")
我还尝试将唯一的 locationID
作为列表拉出并循环遍历它:
val locationList = mydata.select(mydata("prop_code")).distinct
locationList.foreach { printLn }
我知道 spark 不是创建许多较小模型的理想选择,它最适合在大量数据上创建一个模型,但我的任务是将其作为概念验证。
在 spark 中做这样的事情的正确方法是什么?
What is the correct approach for doing something like this in spark?
我敢说根本没有好的方法。有许多可以处理核心数据处理的高级工具和许多可用于编排独立学习任务的任务调度库。 Spark 在这里根本不提供任何东西。
它的调度能力很一般,ML / MLlib 工具也是如此,当每个任务独立时,缩放和容错都没有用。
您可以将 Spark 用于通用目的调度(如果您不介意使用 Python,可以使用 sklearn keyed models 实现这个想法),仅此而已。
我运行遇到了同样的问题。我的数据在 $"description_pretty" 上分区,这就是我处理它的方式。我将数据框拆分到它的分区上,将其送入管道,select 相关列,然后将其联合在一起。
val pipe = new Pipeline().setStages(Array(encoder, assembler,
multivariate_linear_model))
val descriptions_pretty = training_df.select("description_pretty").
distinct.
as[String].
rdd.
collect
val model_predictions_df = descriptions_pretty.par.
map(x => pipe.fit(training_df.filter($"description_pretty" === x)).
transform(prediction_df.filter($"description_pretty" === x)).
select($"description", $"description_pretty",
$"standard_event_date".cast("String"),
$"prediction".as("daily_peak_bps"))).
reduce( _ union _)
您可以在 .t运行sform 之前停止,而是抓取系数
scala> spark.version
res8: String = 2.2.0
我正在使用包含列 locationID
的 spark Dataframe。我已经创建了一个 MLlib 管道来构建线性回归模型,当我为它提供单个 locationID
的数据时它可以工作。我现在想为每个 'locationID' 创建许多模型(生产中可能有几千个 locationID)。我想保存每个模型的模型系数。
我不确定如何在 Scala 中完成此操作。
我的管道是这样定义的:
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.regression.LinearRegressionModel
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql
// Load the regression input data
val mydata = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("./inputdata.csv")
// Crate month one hot encoding
val monthIndexer = new StringIndexer()
.setInputCol("month")
.setOutputCol("monthIndex").fit(mydata)
val monthEncoder = new OneHotEncoder()
.setInputCol(monthIndexer.getOutputCol)
.setOutputCol("monthVec")
val assembler = new VectorAssembler()
.setInputCols(Array("monthVec","tran_adr"))
.setOutputCol("features")
val lr = new LinearRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
val pipeline = new Pipeline()
.setStages(Array(monthIndexer, monthEncoder, assembler, lr))
// Fit using the model pipeline
val myPipelineModel = pipeline.fit(mydata)
然后我可以像这样提取模型详细信息:
val modelExtract = myPipelineModel.stages(3).asInstanceOf[LinearRegressionModel]
println(s"Coefficients: ${modelExtract.coefficients} Intercept: ${modelExtract.intercept}")
// Summarize the model over the training set and print out some metrics
val trainingSummary = modelExtract.summary
println(s"numIterations: ${trainingSummary.totalIterations}")
println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]")
trainingSummary.residuals.show()
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
println(s"r2: ${trainingSummary.r2}")
现在我想对在 mydata
中找到的列 locationID
和每个数据分区上的管道 运行 进行分组。
我试过使用 groupby 但我只能聚合。
val grouped = mydata.groupBy("locationID")
我还尝试将唯一的 locationID
作为列表拉出并循环遍历它:
val locationList = mydata.select(mydata("prop_code")).distinct
locationList.foreach { printLn }
我知道 spark 不是创建许多较小模型的理想选择,它最适合在大量数据上创建一个模型,但我的任务是将其作为概念验证。
在 spark 中做这样的事情的正确方法是什么?
What is the correct approach for doing something like this in spark?
我敢说根本没有好的方法。有许多可以处理核心数据处理的高级工具和许多可用于编排独立学习任务的任务调度库。 Spark 在这里根本不提供任何东西。
它的调度能力很一般,ML / MLlib 工具也是如此,当每个任务独立时,缩放和容错都没有用。
您可以将 Spark 用于通用目的调度(如果您不介意使用 Python,可以使用 sklearn keyed models 实现这个想法),仅此而已。
我运行遇到了同样的问题。我的数据在 $"description_pretty" 上分区,这就是我处理它的方式。我将数据框拆分到它的分区上,将其送入管道,select 相关列,然后将其联合在一起。
val pipe = new Pipeline().setStages(Array(encoder, assembler,
multivariate_linear_model))
val descriptions_pretty = training_df.select("description_pretty").
distinct.
as[String].
rdd.
collect
val model_predictions_df = descriptions_pretty.par.
map(x => pipe.fit(training_df.filter($"description_pretty" === x)).
transform(prediction_df.filter($"description_pretty" === x)).
select($"description", $"description_pretty",
$"standard_event_date".cast("String"),
$"prediction".as("daily_peak_bps"))).
reduce( _ union _)
您可以在 .t运行sform 之前停止,而是抓取系数