如何解决类型不匹配问题(预期:Double,实际:Unit)

How to solve Type mismatch issue (expected: Double, actual: Unit)

这是我计算均方根误差的函数。但是由于错误 Type mismatch issue (expected: Double, actual: Unit),最后一行无法编译。我尝试了很多不同的方法来解决这个问题,但仍然没有成功。有什么想法吗?

  def calculateRMSE(output: DStream[(Double, Double)]): Double = {
        val summse = output.foreachRDD { rdd =>
          rdd.map {
              case pair: (Double, Double) =>
                val err = math.abs(pair._1 - pair._2);
                err*err
          }.reduce(_ + _)
        }
        // math.sqrt(summse)  HOW TO APPLY SQRT HERE?
  }

我完成了如下任务:

import org.apache.spark.mllib.evaluation.RegressionMetrics

output.foreachRDD { rdd =>
  if (!rdd.isEmpty)
    {
      val metrics = new RegressionMetrics(rdd)
      val rmse = metrics.rootMeanSquaredError
      println("RMSE: " + rmse)
    }
}

正如 eliasah 指出的那样,foreach(和 foreachRDD)不 return 一个值;它们仅用于副作用。如果你想 return 某些东西,你需要 map。基于你的第二个解决方案:

val rmse = output.map(rdd => new RegressionMetrics(rdd).rootMeanSquaredError)

如果给它做一个小功能看起来会更好:

val getRmse = (rdd: RDD) => new RegressionMetrics(rdd).rootMeanSquaredError

val rmse = output.map(getRmse)

忽略空 RDD,

val rmse = output.filter(_.nonEmpty).map(getRmse)

这是与理解完全相同的序列。只是map,flatMap,filter的语法糖,不过我初学Scala的时候觉得更容易理解:

val rmse = for {
  rdd <- output
  if (rdd.nonEmpty)
} yield new RegressionMetrics(rdd).rootMeanSquaredError

这是一个总结错误的函数,就像您的第一次尝试:

def calculateRmse(output: DStream[(Double, Double)]): Double = {

val getRmse = (rdd: RDD) => new RegressionMetrics(rdd).rootMeanSquaredError

output.filter(_.nonEmpty).map(getRmse).reduce(_+_)
}

编译器对nonEmpty的抱怨实际上是DStream的filter方法的问题。 filter 不是对 DStream 中的 RDD 进行操作,而是对由 DStream 的类型参数给出的双打 (Double, Double) 对进行操作。

我对 Spark 的了解还不够,不能说这是一个 缺陷 ,但它很奇怪。 Filter 和大多数其他集合操作通常是 defined in terms of foreach, but DStream implements those functions without following the same convention; its deprecated method foreach and current foreachRDD both operate over the stream's RDDs, but its other higher-order methods don't.

所以我的方法行不通。 DStream 可能有一个很好的理由变得奇怪(与性能相关?)这可能是用 foreach:

做的不好的方法
def calculateRmse(ds: DStream[(Double, Double)]): Double = {

  var totalError: Double = 0

  def getRmse(rdd:RDD[(Double, Double)]): Double = new RegressionMetrics(rdd).rootMeanSquaredError

  ds.foreachRDD((rdd:RDD[(Double, Double)]) => if (!rdd.isEmpty) totalError += getRmse(rdd))

  totalError
}

但它有效!