如何使用线性回归评估 Spark Pipeline 中模型的性能(准确性)

How to evaluate the performance of the model (accuracy) in Spark Pipeline with Linear Regression

尝试 运行 具有线性回归的 Spark 管道,我能够执行模型,并寻找

  1. 为了找到我需要模型摘要的模型效率和其他指标,我找到了一些 Python 示例,我在下面评论了这些示例以供参考。
       import org.apache.spark.ml.feature.VectorAssembler
       import spark.implicits._
       import org.apache.spark.sql
       import org.apache.spark.sql.functions._
       import org.apache.spark.sql.types.DecimalType
       import org.apache.spark.sql.{Dataset, Row, SparkSession}
       import org.apache.spark.ml.regression.LinearRegression
       import org.apache.spark.ml.feature.OneHotEncoderEstimator
       import org.apache.spark.ml.{Pipeline, PipelineModel}    

       val splitDF: Array[Dataset[Row]] = inputDF.randomSplit(Array(0.5, 0.5))
        val trainingDF = splitDF(0)
        val testingDF = splitDF(1) 


        val encoder = new OneHotEncoderEstimator()
          .setInputCols(Array("_LookUpID"))
          .setOutputCols(Array("_LookUpID_Encoded"))

        val requiredFeatures = Array("_LookUpID_Encoded","VALUE1")
        val assembler = new VectorAssembler()
          .setInputCols(requiredFeatures)
          .setOutputCol("features")


        val lr = new LinearRegression()
          .setMaxIter(10)
          .setRegParam(0.3)
          .setElasticNetParam(0.8)
          .setFeaturesCol("features")
          .setLabelCol("VALUE2")

        // Fit the model
        val pipeline = new Pipeline()
          .setStages(Array(encoder, assembler, lr))

        // Fit the pipeline to training documents.
        val lrModel = pipeline.fit(trainingDF)

        val predictions = lrModel.transform(testingDF)
        println("*** Predictions ***")
        predictions.printSchema()  

predictions.select("VALUE_DATE","_LookUpID","_CD","VALUE1","VALUE2","prediction").show(100)

        val rm = new RegressionMetrics(predictions.rdd.map(x => (x(4).asInstanceOf[Double], x(5).asInstanceOf[Double])))
        println("sqrt(MSE): " + Math.sqrt(rm.meanSquaredError))
        println("R Squared: " + rm.r2)
        println("Explained Variance: " + rm.explainedVariance + "\n")

带分区的摄取

def getDataFrame(sql: String, lowerNumber: Int, upperNumber: Int): DataFrame = {
 val inputDF: DataFrame = 
 spark.read.format(source = "jdbc")
  .option("url", "jdbc:oracle:thin:@//url")
        .option("user", "user")
        .option("password", "password")
        .option("driver", "oracle.jdbc.OracleDriver")
        .option("dbtable", s"($sql)")
        .option("partitionColumn", "_LookUpID")
        .option("numPartitions", "6")
        .option("lowerBound", lowerNumber)
        .option("upperBound", upperNumber)
        .load()
 inputDF
}
  1. 如果我提供一个包含 100 万行的数据集(工作100K 就可以了),即使作业分配了 32GB 内存。尝试 .cache() inputDF 但没有成功。是因为对 _LookUpID 进行了编码,我还能做些什么不同的事情 更新:增加了驱动程序上的堆内存以及分区数量,并且能够解决它。

谢谢

用 RegressionMetrics 更新了问题 以获取指标的 RMSE 和 R 平方等

分区数据集并增加驱动程序的堆内存,暂时解决了内存问题。会持续关注