Dataset.reduce 不支持 shorthand 功能

Dataset.reduce doesn't support shorthand function

我有一个简单的代码:

test("0153") {
  val c = Seq(1,8,4,2,7)
  val max = (x:Int, y:Int)=> if (x > y) x else y
  c.reduce(max)
}

它工作正常。但是,当我按照相同的方式使用 Dataset.reduce

test("SparkSQLTest") {
  def max(x: Int, y: Int) = if (x > y) x else y
  val spark = SparkSession.builder().master("local").appName("SparkSQLTest").enableHiveSupport().getOrCreate()
  val ds = spark.range(1, 100).map(_.toInt)
  ds.reduce(max) //compiling error:Error:(20, 15) missing argument list for method max
}

编译器抱怨 missing argument list for method max,我不知道这是怎么回事。

根据 spark scala doc,reduce 函数签名是 reduce(func: ReduceFunction[T]): T and reduce(func: (T, T) ⇒ T): T 所以以下任一方法都可以工作

方法一:

scala> val ds = spark.range(1, 100).map(_.toInt)
ds: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> def max(x: Int, y: Int) = if (x > y) x else y
max: (x: Int, y: Int)Int

scala> ds.reduce((x, y) => max(x,y))
res1: Int = 99

方法 2 [如果你坚持像 reduce(max) 这样的简写符号]:

scala> val ds = spark.range(1, 100).map(_.toInt)
ds: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> object max extends org.apache.spark.api.java.function.ReduceFunction[Int]{
     | def call(x:Int, y:Int) = {if (x > y) x else y}
     | }
defined object max

scala> ds.reduce(max)
res3: Int = 99

希望这对您有所帮助!

更改为函数而不是方法,它应该可以工作,即而不是

def max(x: Int, y: Int) = if (x > y) x else y

使用

val max = (x: Int, y: Int) => if (x > y) x else y

使用函数,使用ds.reduce(max)应该可以直接使用。有关差异的更多信息,请参见 here.


否则,正如 hadooper 指出的那样,您可以通过提供参数来使用该方法,

def max(x: Int, y: Int) = if (x > y) x else y
ds.reduce((x, y) => max(x,y))