使用自己的方法修改Spark中var类型的数据集
Modifying var type of data set in Spark using its own method
以下是我面临的问题的一个最小示例。我有一个数组,我想就地修改它,因为它有大约一百万个元素。除了最后一条语句之外,以下代码有效。
import spark.implicits._
case class Frame(x: Double, var y: Array[Double]) {
def total(): Double = {
return y.sum
}
def modifier(): Unit = {
for (i <- 0 until y.length) {
y(i) += 10
}
return
}
}
val df = Seq(
(1.0, Array(0, 2, 1)),
(8.0, Array(1, 2, 3)),
(9.0, Array(11, 21, 23))
).toDF("x", "y")
val ds = df.as[Frame]
ds.show
ds.map(_.total()).show // works
ds.map(_.modifier()).show // does not work
错误如下:
scala> ds.map(_.modifier()).show
<console>:50: error: Unable to find encoder for type Unit. An implicit Encoder[Unit] is needed to store Unit instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
ds.map(_.modifier()).show
我看不出问题的根源。如果您能帮助修复错误,我将不胜感激。
实际上,这与'var'或'val'无关,它与可变数据结构有关。问题是 modifier
returns Unit
(例如什么都没有),所以你不能映射这个结果。您可以 运行 它使用:
case class Frame(x: Double, var y: Array[Double]) {
def total(): Double = {
return y.sum
}
def modifier(): Frame = {
for (i <- 0 until y.length) {
y(i) += 10
}
return this
}
}
但我认为没有多大意义,你应该避免可变状态。此外,我会在 spark 中保持 case 类 简单(即没有逻辑),仅将它们用作数据容器。如果到那时你必须增加每个元素,你也可以这样做:
case class Frame(x: Double, val y: Array[Double])
ds.map(fr => fr.copy(y = fr.y.map(_+10.0))).show
以下是我面临的问题的一个最小示例。我有一个数组,我想就地修改它,因为它有大约一百万个元素。除了最后一条语句之外,以下代码有效。
import spark.implicits._
case class Frame(x: Double, var y: Array[Double]) {
def total(): Double = {
return y.sum
}
def modifier(): Unit = {
for (i <- 0 until y.length) {
y(i) += 10
}
return
}
}
val df = Seq(
(1.0, Array(0, 2, 1)),
(8.0, Array(1, 2, 3)),
(9.0, Array(11, 21, 23))
).toDF("x", "y")
val ds = df.as[Frame]
ds.show
ds.map(_.total()).show // works
ds.map(_.modifier()).show // does not work
错误如下:
scala> ds.map(_.modifier()).show
<console>:50: error: Unable to find encoder for type Unit. An implicit Encoder[Unit] is needed to store Unit instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
ds.map(_.modifier()).show
我看不出问题的根源。如果您能帮助修复错误,我将不胜感激。
实际上,这与'var'或'val'无关,它与可变数据结构有关。问题是 modifier
returns Unit
(例如什么都没有),所以你不能映射这个结果。您可以 运行 它使用:
case class Frame(x: Double, var y: Array[Double]) {
def total(): Double = {
return y.sum
}
def modifier(): Frame = {
for (i <- 0 until y.length) {
y(i) += 10
}
return this
}
}
但我认为没有多大意义,你应该避免可变状态。此外,我会在 spark 中保持 case 类 简单(即没有逻辑),仅将它们用作数据容器。如果到那时你必须增加每个元素,你也可以这样做:
case class Frame(x: Double, val y: Array[Double])
ds.map(fr => fr.copy(y = fr.y.map(_+10.0))).show