Spark:创建新的累加器类型将不起作用(Scala)

Spark: Create new accumulator type won't work (Scala)

我想为 List[(String, String)] 类型的列表创建一个累加器。我首先创建了以下对象:

object ListAccumulator extends AccumulatorParam[List[(String, String)]] {
  def zero(initialValue: List[(String, String)]): List[(String, String)] = {
    Nil
  }

  def addInPlace(list1: List[(String, String)], list2: List[(String, String)]): List[(String, String)] = {
    list1 ::: list2
  }
}

在同一个文件 (SparkQueries.scala) 中,我试图在 class:

的一个函数中使用它
val resultList = sc.accumulator(Nil)(ListAccumulator)

但是,我的编译器在这里抱怨 (ListAccumulator)。出现以下错误:

type mismatch; found : sparkMain.ListAccumulator.type required: org.apache.spark.AccumulatorParam[scala.collection.immutable.Nil.type] Note: List[(String, String)] >: scala.collection.immutable.Nil.type (and sparkMain.ListAccumulator.type <: org.apache.spark.AccumulatorParam[List[(String, String)]]), but trait AccumulatorParam is invariant in type T. You may wish to define T as -T instead.

sparkMain 是 .scala 文件所在的包。我做错了什么?编译器是否可能不知道 ListAccumulator 对象的存在?

提前致谢!

您可以像这样修复您的类型错误:

val resultList = sc.accumulator(ListAccumulator.zero(Nil))(ListAccumulator)

Scala 中的类型推断器有问题,假设最具体的类型(Nil,空列表的类型)是您想要的累加器类型。通过使用 zero,显式 return 类型的 List[(String, String)],您可以帮助它理解您的意思。

附带说明:您正在为 addInPlace 使用列表串联,这与列表的大小成线性关系。如果您的列表变大,您的添加会很慢。如果您需要高效的追加,请使用 ListBufferArrayBuffer,或者如果您想要不可变序列,请使用 Vector