如何使用列映射和聚合函数为 agg 中的第一个函数设置 ignoreNulls 标志?
How to set ignoreNulls flag for first function in agg with map of columns and aggregate functions?
我有大约 20-25 个来自 conf 文件的列列表,必须聚合第一个 Notnull 值。我尝试了通过读取 conf 文件传递列列表和 agg expr 的函数。
我能够获得第一个函数,但找不到如何将 ignoreNull
值指定为 true。
我试过的代码是
def groupAndAggregate(df: DataFrame, cols: List[String] , aggregateFun: Map[String, String]): DataFrame = {
df.groupBy(cols.head, cols.tail: _*).agg(aggregateFun)
}
val df = sc.parallelize(Seq(
(0, null, "1"),
(1, "2", "2"),
(0, "3", "3"),
(0, "4", "4"),
(1, "5", "5"),
(1, "6", "6"),
(1, "7", "7")
)).toDF("grp", "col1", "col2")
//first
groupAndAggregate(df, List("grp"), Map("col1"-> "first", "col2"-> "COUNT") ).show()
+---+-----------+-----------+
|grp|first(col1)|count(col2)|
+---+-----------+-----------+
| 1| 2| 4|
| 0| | 3|
+---+-----------+-----------+
我需要得到 3 作为结果来代替 null。
我正在使用 Spark 2.1.0 和 Scala 2.11
编辑 1:
如果我使用下面的函数
import org.apache.spark.sql.functions.{first,count}
df.groupBy("grp").agg(first(df("col1"), ignoreNulls = true), count("col2")).show()
我得到了我想要的结果。我们可以为 Map 中的第一个函数传递 ignoreNulls
true
吗?
我认为您应该在聚合之前使用 na operator and drop 所有 null
。
na: DataFrameNaFunctions Returns a DataFrameNaFunctions for working with missing data.
drop(cols: Array[String]): DataFrame Returns a new DataFrame that drops rows containing any null or NaN values in the specified columns.
代码将如下所示:
df.na.drop("col1").groupBy(...).agg(first("col1"))
这会影响 count
,因此您必须单独 count
。
我已经能够通过创建列列表并将其传递给 groupBy 的 agg 函数来实现这一点。早期的方法有一个问题,我无法命名列,因为 agg 函数没有返回输出 DF 中列的顺序,我已经重命名了列表本身中的列。
import org.apache.spark.sql.functions._
def groupAndAggregate(df: DataFrame): DataFrame = {
val list: ListBuffer[Column] = new ListBuffer[Column]()
try {
val columnFound = getAggColumns(df) // function to return a Map[String, String]
val agg_func = columnFound.entrySet().toList.
foreach(field =>
list += first(df(columnFound.getOrDefault(field.getKey, "")),ignoreNulls = true).as(field.getKey)
)
list += sum(df("col1")).as("watch_time")
list += count("*").as("frequency")
val groupColumns = getGroupColumns(df) // function to return a List[String]
val output = df.groupBy(groupColumns.head, groupColumns.tail: _*).agg(
list.head, list.tail: _*
)
output
} catch {
case e: Exception => {
e.printStackTrace()}
null
}
}
我有大约 20-25 个来自 conf 文件的列列表,必须聚合第一个 Notnull 值。我尝试了通过读取 conf 文件传递列列表和 agg expr 的函数。
我能够获得第一个函数,但找不到如何将 ignoreNull
值指定为 true。
我试过的代码是
def groupAndAggregate(df: DataFrame, cols: List[String] , aggregateFun: Map[String, String]): DataFrame = {
df.groupBy(cols.head, cols.tail: _*).agg(aggregateFun)
}
val df = sc.parallelize(Seq(
(0, null, "1"),
(1, "2", "2"),
(0, "3", "3"),
(0, "4", "4"),
(1, "5", "5"),
(1, "6", "6"),
(1, "7", "7")
)).toDF("grp", "col1", "col2")
//first
groupAndAggregate(df, List("grp"), Map("col1"-> "first", "col2"-> "COUNT") ).show()
+---+-----------+-----------+
|grp|first(col1)|count(col2)|
+---+-----------+-----------+
| 1| 2| 4|
| 0| | 3|
+---+-----------+-----------+
我需要得到 3 作为结果来代替 null。 我正在使用 Spark 2.1.0 和 Scala 2.11
编辑 1:
如果我使用下面的函数
import org.apache.spark.sql.functions.{first,count}
df.groupBy("grp").agg(first(df("col1"), ignoreNulls = true), count("col2")).show()
我得到了我想要的结果。我们可以为 Map 中的第一个函数传递 ignoreNulls
true
吗?
我认为您应该在聚合之前使用 na operator and drop 所有 null
。
na: DataFrameNaFunctions Returns a DataFrameNaFunctions for working with missing data.
drop(cols: Array[String]): DataFrame Returns a new DataFrame that drops rows containing any null or NaN values in the specified columns.
代码将如下所示:
df.na.drop("col1").groupBy(...).agg(first("col1"))
这会影响 count
,因此您必须单独 count
。
我已经能够通过创建列列表并将其传递给 groupBy 的 agg 函数来实现这一点。早期的方法有一个问题,我无法命名列,因为 agg 函数没有返回输出 DF 中列的顺序,我已经重命名了列表本身中的列。
import org.apache.spark.sql.functions._
def groupAndAggregate(df: DataFrame): DataFrame = {
val list: ListBuffer[Column] = new ListBuffer[Column]()
try {
val columnFound = getAggColumns(df) // function to return a Map[String, String]
val agg_func = columnFound.entrySet().toList.
foreach(field =>
list += first(df(columnFound.getOrDefault(field.getKey, "")),ignoreNulls = true).as(field.getKey)
)
list += sum(df("col1")).as("watch_time")
list += count("*").as("frequency")
val groupColumns = getGroupColumns(df) // function to return a List[String]
val output = df.groupBy(groupColumns.head, groupColumns.tail: _*).agg(
list.head, list.tail: _*
)
output
} catch {
case e: Exception => {
e.printStackTrace()}
null
}
}