Apache Spark 中的线性回归给出了错误的截距和权重

Linear regression in Apache Spark giving wrong intercept and weights

将 MLLib LinearRegressionWithSGD 用于虚拟数据集 (y, x1, x2) for y = (2*x1) + (3*x2) + 4 会产生错误的截距和权重。实际使用的数据是,

x1  x2  y
1   0.1 6.3
2   0.2 8.6
3   0.3 10.9
4   0.6 13.8
5   0.8 16.4
6   1.2 19.6
7   1.6 22.8
8   1.9 25.7
9   2.1 28.3
10  2.4 31.2
11  2.7 34.1

我设置了以下输入参数并得到了以下模型输出 [numIterations, step, miniBatchFraction, regParam] [截距, [权重]]

  1. [5,9,0.6,5] = [2.36667135839938E13, 权重:[1.708772545209758E14, 3.849548062850367E13] ]
  2. [2,默认,默认,默认] = [-2495.5635231554793, 权重:[-19122.41357929275,-4308.224496146531]]
  3. [5,默认,默认,默认] = [2.875191315671051E8, 权重: [2.2013802074495964E9,4.9593017130199933E8]]
  4. [20,默认,默认,默认] = [-8.896967235537095E29, 权重: [-6.811932001659158E30,-1.53​​46020624812824E30]]

需要知道,

  1. 如何获得上述虚拟数据的正确截距和权重 [4, [2, 3]]。
  2. 调整步长有助于收敛吗?我需要 运行 以自动方式对数百个变量进行此操作,所以我不想这样做。
  3. 我应该缩放数据吗?它有什么帮助?

下面是用于生成这些结果的代码。

object SciBenchTest {

  def main(args: Array[String]): Unit = run

  def run: Unit = {

    val sparkConf = new SparkConf().setAppName("SparkBench")
    val sc = new SparkContext(sparkConf)

    // Load and parse the dummy data (y, x1, x2) for y = (2*x1) + (3*x2) + 4
    // i.e. intercept should be 4, weights (2, 3)?
    val data = sc.textFile("data/dummy.csv")

    // LabeledPoint is (label, [features])
    val parsedData = data.map { line =>
      val parts = line.split(',')
      val label = parts(2).toDouble
      val features = Array(parts(0), parts(1)) map (_.toDouble)
      LabeledPoint(label, Vectors.dense(features))
    }
    //parsedData.collect().foreach(x => println(x));

    // Scale the features
    /*val scaler = new StandardScaler(withMean = true, withStd = true)
      .fit(parsedData.map(x => x.features))
    val scaledData = parsedData
      .map(x =>
      LabeledPoint(x.label,
        scaler.transform(Vectors.dense(x.features.toArray))))

    scaledData.collect().foreach(x => println(x));*/

    // Building the model: SGD = stochastic gradient descent
    val numIterations = 20 //5
    val step = 9.0 //9.0 //0.7
    val miniBatchFraction = 0.6 //0.7 //0.65 //0.7
    val regParam = 5.0 //3.0 //10.0
    //val model = LinearRegressionWithSGD.train(parsedData, numIterations, step) //scaledData

    val algorithm = new LinearRegressionWithSGD()       //train(parsedData, numIterations)
    algorithm.setIntercept(true)
    algorithm.optimizer
      //.setMiniBatchFraction(miniBatchFraction)
      .setNumIterations(numIterations)
      //.setStepSize(step)
      //.setGradient(new LeastSquaresGradient())
      //.setUpdater(new SquaredL2Updater()) //L1Updater //SimpleUpdater //SquaredL2Updater
      //.setRegParam(regParam)

    val model = algorithm.run(parsedData)

    println(s">>>> Model intercept: ${model.intercept}, weights: ${model.weights}")

    // Evaluate model on training examples
    val valuesAndPreds = parsedData.map { point =>
      val prediction = model.predict(point.features)
      (point.label, point.features, prediction)
    }
    // Print out features, actual and predicted values...
    valuesAndPreds.take(10).foreach({ case (v, f, p) =>
      println(s"Features: ${f}, Predicted: ${p}, Actual: ${v}")
    })
  }
}

如文档中所述 https://spark.apache.org/docs/1.0.2/mllib-optimization.html 为 SGD 方法选择最佳步长通常很微妙。

我会尝试情人价值观,例如

// Build linear regression model
var regression = new LinearRegressionWithSGD().setIntercept(true)
regression.optimizer.setStepSize(0.001)
val model = regression.run(parsedData)

添加步长对我们帮助不大。

我们使用以下参数来计算 intercept/weights 和损失,并使用它们来构建线性回归模型以预测我们的特征。感谢@selvinsource 为我指明了正确的方向。

  val data = sc.textFile("data/dummy.csv")

  // LabeledPoint is (label, [features])
  val parsedData = data.map { line =>
    val parts = line.split(',')
    val label = parts(2).toDouble
    val features = Array(parts(0), parts(1)) map (_.toDouble)
    (label, MLUtils.appendBias(Vectors.dense(features)))
  }.cache()

  val numCorrections = 5 //10//5//3
  val convergenceTol = 1e-4 //1e-4
  val maxNumIterations = 20 //20//100
  val regParam = 0.00001 //0.1//10.0

  val (weightsWithIntercept, loss) = LBFGS.runLBFGS(
    parsedData,
    new LeastSquaresGradient(),//LeastSquaresGradient
    new SquaredL2Updater(), //SquaredL2Updater(),SimpleUpdater(),L1Updater()
    numCorrections,
    convergenceTol,
    maxNumIterations,
    regParam,
    Vectors.dense(0.0, 0.0, 0.0))//initialWeightsWithIntercept)

  loss.foreach(println)

  val model = new LinearRegressionModel(
    Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1)),
    weightsWithIntercept(weightsWithIntercept.size - 1))

  println(s">>>> Model intercept: ${model.intercept}, weights: ${model.weights}")

  // Evaluate model on training examples
  val valuesAndPreds = parsedData.collect().map { point =>
    var prediction = model.predict(Vectors.dense(point._2.apply(0), point._2.apply(1)))
    (prediction, point._1)
  }

  // Print out features, actual and predicted values...
  valuesAndPreds.take(10).foreach({ case (v, f) =>
    println(s"Features: ${f}, Predicted: ${v}")//, Actual: ${v}")
  })