使用自己的方法修改Spark中var类型的数据集

Modifying var type of data set in Spark using its own method

以下是我面临的问题的一个最小示例。我有一个数组,我想就地修改它,因为它有大约一百万个元素。除了最后一条语句之外,以下代码有效。

 import spark.implicits._

 case class Frame(x: Double, var y: Array[Double]) {
    def total(): Double = {
        return y.sum
    }
    def modifier(): Unit = {
        for (i <- 0 until y.length) {
            y(i) += 10
        }
        return
    }
 }

 val df = Seq(
               (1.0, Array(0, 2, 1)),
               (8.0, Array(1, 2, 3)),
               (9.0, Array(11, 21, 23))
             ).toDF("x", "y")

 val ds = df.as[Frame]
 ds.show

 ds.map(_.total()).show     // works
 ds.map(_.modifier()).show  // does not work

错误如下:

scala> ds.map(_.modifier()).show
<console>:50: error: Unable to find encoder for type Unit. An implicit Encoder[Unit] is needed to store Unit instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
       ds.map(_.modifier()).show

我看不出问题的根源。如果您能帮助修复错误,我将不胜感激。

实际上,这与'var'或'val'无关,它与可变数据结构有关。问题是 modifier returns Unit (例如什么都没有),所以你不能映射这个结果。您可以 运行 它使用:

case class Frame(x: Double, var y: Array[Double]) {
def total(): Double = {
  return y.sum
}
def modifier(): Frame = {
  for (i <- 0 until y.length) {
    y(i) += 10
  }
  return this
}

}

但我认为没有多大意义,你应该避免可变状态。此外,我会在 spark 中保持 case 类 简单(即没有逻辑),仅将它们用作数据容器。如果到那时你必须增加每个元素,你也可以这样做:

case class Frame(x: Double, val y: Array[Double])

ds.map(fr => fr.copy(y = fr.y.map(_+10.0))).show