Scala 编译器无法在 Spark lambda 函数中推断类型
Scala compiler failed to infer type inside Spark lambda function
假设我有这段用 Scala 2.12 编写的 Spark 代码
val dataset = spark.emptyDataset[String]
dataset.foreachPartition( partition => partition.foreach {
entry: String => println(entry)
})
当我运行代码时,编译器给出了这个错误
[info] Compiling 1 Scala source to <path>/scala-2.12/classes ...
[error] Code.scala:11:52: value foreach is not a member of Object
[error] empty.foreachPartition( partition => partition.foreach{
[error] ^
[error] one error found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 1 s, completed Jul 11, 2020 1:43:41 AM
为什么编译器 partition
作为 Object
而不是 Iterator[String]
?
我必须手动添加 partition
类型才能使代码正常工作。
val dataset = spark.emptyDataset[String]
dataset.foreachPartition( (partition:Iterator[String]) => partition.foreach {
entry: String => println(entry)
})
这是因为 foreachPartition
和 Java-Scala 互操作的两个重载版本。
如果代码仅在 Scala 中(这是最少的代码并且独立于 Spark)
val dataset: Dataset[String] = ???
dataset.foreachPartition(partition => ???)
class Dataset[T] {
def foreachPartition(f: Iterator[T] => Unit): Unit = ???
def foreachPartition(func: ForeachPartitionFunction[T]): Unit = ???
}
trait ForeachPartitionFunction[T] extends Serializable {
def call(t: Iterator[T]): Unit
}
然后将推断 partition
的类型(如 scala.collection.Iterator[String]
)。
但在实际的 Spark 代码中 ForeachPartitionFunction
是 Java 接口,其方法 call
接受 java.util.Iterator[String]
.
所以两个选项
dataset.foreachPartition((
(partition: scala.collection.Iterator[String]) => ???
): Iterator[String] => Unit)
dataset.foreachPartition((
(partition: java.util.Iterator[String]) => ???
): ForeachPartitionFunction[String])
符合条件,编译器无法推断 partition
的类型。
并且 Scala 中的推理是本地的,因此在编译器可以看到 partition => partition.foreach...
(并且 java.util.Iterator[String]
没有方法 foreach
)之后再输入就太晚了 partition
.
正如@Dmytro 所说,scala 编译器无法推断它应该应用哪个重载函数。然而,您可以使用一个简单的解决方法,方法是使用此辅助函数:
def helper[I](f: I => Unit): I => Unit = f
现在您需要做的就是:
dataset.foreachPartition( (partition:Iterator[String]) => partition.foreach {
helper[String](entry => println(entry))
})
假设我有这段用 Scala 2.12 编写的 Spark 代码
val dataset = spark.emptyDataset[String]
dataset.foreachPartition( partition => partition.foreach {
entry: String => println(entry)
})
当我运行代码时,编译器给出了这个错误
[info] Compiling 1 Scala source to <path>/scala-2.12/classes ...
[error] Code.scala:11:52: value foreach is not a member of Object
[error] empty.foreachPartition( partition => partition.foreach{
[error] ^
[error] one error found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 1 s, completed Jul 11, 2020 1:43:41 AM
为什么编译器 partition
作为 Object
而不是 Iterator[String]
?
我必须手动添加 partition
类型才能使代码正常工作。
val dataset = spark.emptyDataset[String]
dataset.foreachPartition( (partition:Iterator[String]) => partition.foreach {
entry: String => println(entry)
})
这是因为 foreachPartition
和 Java-Scala 互操作的两个重载版本。
如果代码仅在 Scala 中(这是最少的代码并且独立于 Spark)
val dataset: Dataset[String] = ???
dataset.foreachPartition(partition => ???)
class Dataset[T] {
def foreachPartition(f: Iterator[T] => Unit): Unit = ???
def foreachPartition(func: ForeachPartitionFunction[T]): Unit = ???
}
trait ForeachPartitionFunction[T] extends Serializable {
def call(t: Iterator[T]): Unit
}
然后将推断 partition
的类型(如 scala.collection.Iterator[String]
)。
但在实际的 Spark 代码中 ForeachPartitionFunction
是 Java 接口,其方法 call
接受 java.util.Iterator[String]
.
所以两个选项
dataset.foreachPartition((
(partition: scala.collection.Iterator[String]) => ???
): Iterator[String] => Unit)
dataset.foreachPartition((
(partition: java.util.Iterator[String]) => ???
): ForeachPartitionFunction[String])
符合条件,编译器无法推断 partition
的类型。
并且 Scala 中的推理是本地的,因此在编译器可以看到 partition => partition.foreach...
(并且 java.util.Iterator[String]
没有方法 foreach
)之后再输入就太晚了 partition
.
正如@Dmytro 所说,scala 编译器无法推断它应该应用哪个重载函数。然而,您可以使用一个简单的解决方法,方法是使用此辅助函数:
def helper[I](f: I => Unit): I => Unit = f
现在您需要做的就是:
dataset.foreachPartition( (partition:Iterator[String]) => partition.foreach {
helper[String](entry => println(entry))
})