如何使用线性回归评估 Spark Pipeline 中模型的性能(准确性)
How to evaluate the performance of the model (accuracy) in Spark Pipeline with Linear Regression
尝试 运行 具有线性回归的 Spark 管道,我能够执行模型,并寻找
- 为了找到我需要模型摘要的模型效率和其他指标,我找到了一些 Python 示例,我在下面评论了这些示例以供参考。
import org.apache.spark.ml.feature.VectorAssembler
import spark.implicits._
import org.apache.spark.sql
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DecimalType
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.feature.OneHotEncoderEstimator
import org.apache.spark.ml.{Pipeline, PipelineModel}
val splitDF: Array[Dataset[Row]] = inputDF.randomSplit(Array(0.5, 0.5))
val trainingDF = splitDF(0)
val testingDF = splitDF(1)
val encoder = new OneHotEncoderEstimator()
.setInputCols(Array("_LookUpID"))
.setOutputCols(Array("_LookUpID_Encoded"))
val requiredFeatures = Array("_LookUpID_Encoded","VALUE1")
val assembler = new VectorAssembler()
.setInputCols(requiredFeatures)
.setOutputCol("features")
val lr = new LinearRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
.setFeaturesCol("features")
.setLabelCol("VALUE2")
// Fit the model
val pipeline = new Pipeline()
.setStages(Array(encoder, assembler, lr))
// Fit the pipeline to training documents.
val lrModel = pipeline.fit(trainingDF)
val predictions = lrModel.transform(testingDF)
println("*** Predictions ***")
predictions.printSchema()
predictions.select("VALUE_DATE","_LookUpID","_CD","VALUE1","VALUE2","prediction").show(100)
val rm = new RegressionMetrics(predictions.rdd.map(x => (x(4).asInstanceOf[Double], x(5).asInstanceOf[Double])))
println("sqrt(MSE): " + Math.sqrt(rm.meanSquaredError))
println("R Squared: " + rm.r2)
println("Explained Variance: " + rm.explainedVariance + "\n")
带分区的摄取
def getDataFrame(sql: String, lowerNumber: Int, upperNumber: Int): DataFrame = {
val inputDF: DataFrame =
spark.read.format(source = "jdbc")
.option("url", "jdbc:oracle:thin:@//url")
.option("user", "user")
.option("password", "password")
.option("driver", "oracle.jdbc.OracleDriver")
.option("dbtable", s"($sql)")
.option("partitionColumn", "_LookUpID")
.option("numPartitions", "6")
.option("lowerBound", lowerNumber)
.option("upperBound", upperNumber)
.load()
inputDF
}
- 如果我提供一个包含 100 万行的数据集(工作100K 就可以了),即使作业分配了 32GB 内存。尝试 .cache() inputDF 但没有成功。是因为对 _LookUpID 进行了编码,我还能做些什么不同的事情
更新:增加了驱动程序上的堆内存以及分区数量,并且能够解决它。
谢谢
用 RegressionMetrics 更新了问题 以获取指标的 RMSE 和 R 平方等
分区数据集并增加驱动程序的堆内存,暂时解决了内存问题。会持续关注
尝试 运行 具有线性回归的 Spark 管道,我能够执行模型,并寻找
- 为了找到我需要模型摘要的模型效率和其他指标,我找到了一些 Python 示例,我在下面评论了这些示例以供参考。
import org.apache.spark.ml.feature.VectorAssembler
import spark.implicits._
import org.apache.spark.sql
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DecimalType
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.feature.OneHotEncoderEstimator
import org.apache.spark.ml.{Pipeline, PipelineModel}
val splitDF: Array[Dataset[Row]] = inputDF.randomSplit(Array(0.5, 0.5))
val trainingDF = splitDF(0)
val testingDF = splitDF(1)
val encoder = new OneHotEncoderEstimator()
.setInputCols(Array("_LookUpID"))
.setOutputCols(Array("_LookUpID_Encoded"))
val requiredFeatures = Array("_LookUpID_Encoded","VALUE1")
val assembler = new VectorAssembler()
.setInputCols(requiredFeatures)
.setOutputCol("features")
val lr = new LinearRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
.setFeaturesCol("features")
.setLabelCol("VALUE2")
// Fit the model
val pipeline = new Pipeline()
.setStages(Array(encoder, assembler, lr))
// Fit the pipeline to training documents.
val lrModel = pipeline.fit(trainingDF)
val predictions = lrModel.transform(testingDF)
println("*** Predictions ***")
predictions.printSchema()
predictions.select("VALUE_DATE","_LookUpID","_CD","VALUE1","VALUE2","prediction").show(100)
val rm = new RegressionMetrics(predictions.rdd.map(x => (x(4).asInstanceOf[Double], x(5).asInstanceOf[Double])))
println("sqrt(MSE): " + Math.sqrt(rm.meanSquaredError))
println("R Squared: " + rm.r2)
println("Explained Variance: " + rm.explainedVariance + "\n")
带分区的摄取
def getDataFrame(sql: String, lowerNumber: Int, upperNumber: Int): DataFrame = {
val inputDF: DataFrame =
spark.read.format(source = "jdbc")
.option("url", "jdbc:oracle:thin:@//url")
.option("user", "user")
.option("password", "password")
.option("driver", "oracle.jdbc.OracleDriver")
.option("dbtable", s"($sql)")
.option("partitionColumn", "_LookUpID")
.option("numPartitions", "6")
.option("lowerBound", lowerNumber)
.option("upperBound", upperNumber)
.load()
inputDF
}
- 如果我提供一个包含 100 万行的数据集(工作100K 就可以了),即使作业分配了 32GB 内存。尝试 .cache() inputDF 但没有成功。是因为对 _LookUpID 进行了编码,我还能做些什么不同的事情 更新:增加了驱动程序上的堆内存以及分区数量,并且能够解决它。
谢谢
用 RegressionMetrics 更新了问题 以获取指标的 RMSE 和 R 平方等
分区数据集并增加驱动程序的堆内存,暂时解决了内存问题。会持续关注