如何舍入 Spark 数据集中的列?

How can I round a column in a Spark Dataset?

使用 Scala Spark,如何使用类型化数据集 API 舍入聚合列?

另外,如何通过groupby操作保留数据集的类型?

这是我目前拥有的:

case class MyRow(
  k1: String,
  k2: String,
  c1: Double,
  c2: Double
)

def groupTyped(ds: Dataset[MyRow]): Dataset[MyRow] = {
import org.apache.spark.sql.expressions.scalalang.typed._
ds.groupByKey(row => (row.k1, row.k2))
  .agg(
    avg(_.c1),
    avg(_.c2)
  )
  .map(r => MyRow(r._1._1, r._1._2, r._2, r._3))
}
  1. 如果我将 avg(_.c1) 替换为 round(avg(_.c1)),我会收到类型错误。四舍五入我的价值观的正确方法是什么?
  2. .map(...) 行感觉不对 -- 是否有更优雅的方法来保留我的数据集的类型?

谢谢!

使用 round 确实会因类型错误而失败,因为 agg 需要类型为 TypedColumn[IN, OUT] 的聚合函数,而 round 提供 Column (适用于数据帧).

这里你需要的是一个四舍五入的平均聚合函数,org.apache.spark.sql.expressions.scalalang.typed._ 中没有提供 - 但你可以通过扩展执行平均聚合的 class 来轻松地自己创建一个:

// Extend TypedAverage - round the result before returning it
class TypedRoundAverage[IN](f: IN => Double) extends TypedAverage[IN](f) {
  override def finish(reduction: (Double, Long)): Double = math.round(super.finish(reduction))
}

// A nice wrapper to create the TypedRoundAverage for a given function  
def roundAvg[IN](f: IN => Double): TypedColumn[IN, Double] = new TypedRoundAverage(f).toColumn

// Now you can use "roundAvg" instead of "round"  
def groupTyped(ds: Dataset[MyRow]): Dataset[MyRow] = {
  ds.groupByKey(row => (row.k1, row.k2))
    .agg(
      roundAvg(_.c1),
      roundAvg(_.c2)
    )
    .map { case ((k1, k2), c1, c2) => MyRow(k1, k2, c1, c2) } // just a nicer way to put it
}

我看不出摆脱 map 操作的方法,因为分组依据必须 returns 一个元组,但是使用模式匹配可以使它更好一些

虽然接受的答案有效并且更笼统,但在这种情况下您也可以使用 round。您只需要在使用 .as[T] 舍入后输入列(还需要将类型定义为 avg)。

.agg(
  // Alternative ways to define a type to avg
  round(avg((r: MyRow) => r.c1)).as[Double],
  round(avg[MyRow](_.c2)).as[Double]
)