Scala 编译器无法在 Spark lambda 函数中推断类型

Scala compiler failed to infer type inside Spark lambda function

假设我有这段用 Scala 2.12 编写的 Spark 代码

    val dataset = spark.emptyDataset[String]

    dataset.foreachPartition( partition => partition.foreach {
      entry: String => println(entry)
    })

当我运行代码时,编译器给出了这个错误


[info] Compiling 1 Scala source to <path>/scala-2.12/classes ...
[error] Code.scala:11:52: value foreach is not a member of Object
[error]     empty.foreachPartition( partition => partition.foreach{
[error]                                                    ^
[error] one error found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 1 s, completed Jul 11, 2020 1:43:41 AM


为什么编译器 partition 作为 Object 而不是 Iterator[String]

我必须手动添加 partition 类型才能使代码正常工作。

    val dataset = spark.emptyDataset[String]

    dataset.foreachPartition( (partition:Iterator[String]) => partition.foreach {
      entry: String => println(entry)
    })

这是因为 foreachPartition 和 Java-Scala 互操作的两个重载版本。

如果代码仅在 Scala 中(这是最少的代码并且独立于 Spark)

val dataset: Dataset[String] = ???

dataset.foreachPartition(partition => ???)

class Dataset[T] {
  def foreachPartition(f: Iterator[T] => Unit): Unit = ???
  def foreachPartition(func: ForeachPartitionFunction[T]): Unit = ???
}

trait ForeachPartitionFunction[T] extends Serializable {
  def call(t: Iterator[T]): Unit
}

然后将推断 partition 的类型(如 scala.collection.Iterator[String])。

但在实际的 Spark 代码中 ForeachPartitionFunction 是 Java 接口,其方法 call 接受 java.util.Iterator[String].

所以两个选项

dataset.foreachPartition((
  (partition: scala.collection.Iterator[String]) => ??? 
): Iterator[String] => Unit)

dataset.foreachPartition((
  (partition: java.util.Iterator[String]) => ??? 
): ForeachPartitionFunction[String])

符合条件,编译器无法推断 partition 的类型。

并且 Scala 中的推理是本地的,因此在编译器可以看到 partition => partition.foreach...(并且 java.util.Iterator[String] 没有方法 foreach)之后再输入就太晚了 partition.

正如@Dmytro 所说,scala 编译器无法推断它应该应用哪个重载函数。然而,您可以使用一个简单的解决方法,方法是使用此辅助函数:

  def helper[I](f: I => Unit): I => Unit = f

现在您需要做的就是:

dataset.foreachPartition( (partition:Iterator[String]) => partition.foreach {
      helper[String](entry => println(entry))
    })