如何使用列映射和聚合函数为 agg 中的第一个函数设置 ignoreNulls 标志?

How to set ignoreNulls flag for first function in agg with map of columns and aggregate functions?

我有大约 20-25 个来自 conf 文件的列列表,必须聚合第一个 Notnull 值。我尝试了通过读取 conf 文件传递​​列列表和 agg expr 的函数。 我能够获得第一个函数,但找不到如何将 ignoreNull 值指定为 true。

我试过的代码是

def groupAndAggregate(df: DataFrame,  cols: List[String] , aggregateFun: Map[String, String]): DataFrame = {
    df.groupBy(cols.head, cols.tail: _*).agg(aggregateFun)
}

val df = sc.parallelize(Seq(
  (0,  null, "1"),
  (1, "2", "2"),
  (0, "3", "3"),
  (0, "4", "4"),
  (1, "5", "5"),
  (1, "6", "6"),
  (1, "7", "7")
)).toDF("grp", "col1", "col2")


//first
groupAndAggregate(df,  List("grp"), Map("col1"-> "first", "col2"-> "COUNT") ).show()

+---+-----------+-----------+
|grp|first(col1)|count(col2)|
+---+-----------+-----------+
|  1|          2|          4|
|  0|           |          3|
+---+-----------+-----------+

我需要得到 3 作为结果来代替 null。 我正在使用 Spark 2.1.0 和 Scala 2.11

编辑 1:

如果我使用下面的函数

import org.apache.spark.sql.functions.{first,count}
df.groupBy("grp").agg(first(df("col1"), ignoreNulls = true), count("col2")).show()

我得到了我想要的结果。我们可以为 Map 中的第一个函数传递 ignoreNulls true 吗?

我认为您应该在聚合之前使用 na operator and drop 所有 null

na: DataFrameNaFunctions Returns a DataFrameNaFunctions for working with missing data.

drop(cols: Array[String]): DataFrame Returns a new DataFrame that drops rows containing any null or NaN values in the specified columns.

代码将如下所示:

df.na.drop("col1").groupBy(...).agg(first("col1"))

这会影响 count,因此您必须单独 count

我已经能够通过创建列列表并将其传递给 groupBy 的 agg 函数来实现这一点。早期的方法有一个问题,我无法命名列,因为 agg 函数没有返回输出 DF 中列的顺序,我已经重命名了列表本身中的列。

     import org.apache.spark.sql.functions._

     def groupAndAggregate(df: DataFrame): DataFrame = {
        val list: ListBuffer[Column] = new ListBuffer[Column]()
        try {

          val columnFound = getAggColumns(df) // function to return a Map[String, String]

          val agg_func = columnFound.entrySet().toList.
            foreach(field =>
              list += first(df(columnFound.getOrDefault(field.getKey, "")),ignoreNulls = true).as(field.getKey)
            )

          list += sum(df("col1")).as("watch_time")
          list += count("*").as("frequency")

          val groupColumns = getGroupColumns(df) // function to return a List[String]

          val output = df.groupBy(groupColumns.head, groupColumns.tail: _*).agg(
            list.head, list.tail: _*
          )

          output

        } catch {
          case e: Exception => {
            e.printStackTrace()}
            null
        }
      }