Spark 数据集联合重置 class 个变量

Spark dataset union resets class variables

我有这样的案例class:

case class Ais(NotImportant)
  extends Serializable {


  var flag = Ais.Flag.NotFlagged
  var cluster = Ais.Unknown
  var visited = false

  override def toString(): String = {
    s"$cluster,$flag,$visited"
  }
}

在 运行 我的算法之后,我最终得到两个 (Int,Ais) 类型的数据集,其中 Ais 对象中的变量包含信息。我需要联合他们。对我来说最重要的是 var clustervar visited 的值。然而在 union 之后,它们被重置为默认值。

labeledInner.foreach(println(_)) // This is fine
println("==========")
labeledOuter.foreach(println(_)) // This is also fine
println("==========")
labeledOuter.union(labeledInner).foreach(println(_)) // Here 
                                                // everything set to default

我是 运行 Spark 2.1 和 Scala 2.11.8。

你不应该使用可变 vars 以防万一 classes 在使用 Spark 时 - 这些不 "survive" Spark 的编码,所以任何触发编码和解码的数据集的非平凡使用(如使用 union)不会保留这些字段。

为什么? Spark 具有内置的编码器,旨在有效地将对象编码为字节数组(和返回)。对于 case classes(实际上,对于所有 Products,主要是指 case classes 和元组),编码器只编码 case-class fields 被定义为 class 参数(在你的例子中,只有 NotImportant)。您可以通过为您的案例创建相关编码器 class 并检查其 schema:

来查看这一点
case class A(s: String) {
  var a: Int = 0
}

Encoders.product[A].schema.printTreeString()
// root
// |-- s: string (nullable = true)

如您所见 - 只有 s 幸存下来,a 不是架构的一部分。

还有什么选择?在使用 Spark 时(实际上,通常是 Scala),您应该避免使用 mutable 字段。尝试对数据建模以将所有字段包含为不可变字段,例如:

case class Ais(flag: Flag, cluster: Cluster, visited: Boolean)

然后,对于 "mutate" 这些对象,您可以使用 copy 方法创建一个新实例,其中一些(或 none)字段已更改,例如:

val a = Ais(Ais.Flag.NotFlagged, Ais.Unknown, false)
val b = a.copy(visited = true)

这些对象可以安全地与 Spark 一起使用(它们 "survive" 序列化并且是不可变的)。