按列 "grp" 分组并压缩 DataFrame -(对按列 "ord" 排序的每一列取最后一个非空值)

Group by column "grp" and compress DataFrame - (take last not null value for each column ordering by column "ord")

假设我有以下 DataFrame:

+---+--------+---+----+----+
|grp|null_col|ord|col1|col2|
+---+--------+---+----+----+
|  1|    null|  3|null|  11|
|  2|    null|  2| xxx|  22|
|  1|    null|  1| yyy|null|
|  2|    null|  7|null|  33|
|  1|    null| 12|null|null|
|  2|    null| 19|null|  77|
|  1|    null| 10| s13|null|
|  2|    null| 11| a23|null|
+---+--------+---+----+----+

这是带注释的同一个示例 DF,按 grpord 排序:

scala> df.orderBy("grp", "ord").show
+---+--------+---+----+----+
|grp|null_col|ord|col1|col2|
+---+--------+---+----+----+
|  1|    null|  1| yyy|null|
|  1|    null|  3|null|  11|   # grp:1 - last value for `col2` (11)
|  1|    null| 10| s13|null|   # grp:1 - last value for `col1` (s13)
|  1|    null| 12|null|null|   # grp:1 - last values for `null_col`, `ord`
|  2|    null|  2| xxx|  22|   
|  2|    null|  7|null|  33|   
|  2|    null| 11| a23|null|   # grp:2 - last value for `col1` (a23)
|  2|    null| 19|null|  77|   # grp:2 - last values for `null_col`, `ord`, `col2`
+---+--------+---+----+----+

我想压缩一下。 IE。按列 "grp" 对其进行分组,对于每个组,按 "ord" 列对行进行排序,并取每列中的最后一个 not null 值(如果有的话)。

+---+--------+---+----+----+
|grp|null_col|ord|col1|col2|
+---+--------+---+----+----+
|  1|    null| 12| s13|  11|
|  2|    null| 19| a23|  77|
+---+--------+---+----+----+

我看过以下类似问题:

但我的真实 DataFrame 有超过 250 列,所以我需要一个解决方案,我不必明确指定所有列。

我无法理解它...


MCVE:如何创建示例 DataFrame:

  1. 创建本地文件“/tmp/data.txt”并复制并粘贴 DataFrame 的上下文(如上所示)
  2. 定义:
  3. 将“/tmp/data.txt”解析为 DataFrame:

    val df = readSparkOutput("file:///tmp/data.txt")
    

UPDATE:我觉得应该和下面的类似SQL:

SELECT
  grp, ord, null_col, col1, col2
FROM (
    SELECT
      grp,
      ord,
      FIRST(null_col) OVER (PARTITION BY grp ORDER BY ord DESC) as null_col,
      FIRST(col1) OVER (PARTITION BY grp ORDER BY ord DESC) as col1,
      FIRST(col2) OVER (PARTITION BY grp ORDER BY ord DESC) as col2,
      ROW_NUMBER() OVER (PARTITION BY grp ORDER BY ord DESC) as rn
    FROM table_name) as v
WHERE v.rn = 1;

how can we dynamically generate such a Spark query?

我尝试了以下简化方法:

import org.apache.spark.sql.expressions.Window

val win = Window
  .partitionBy("grp")
  .orderBy($"ord".desc)

val cols = df.columns.map(c => first(c, ignoreNulls=true).over(win).as(c))

产生:

scala> cols
res23: Array[org.apache.spark.sql.Column] = Array(first(grp, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `grp`, first(null_col, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `null_col`, first(ord, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `ord`, first(col1, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `col1`, first(col2, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `col2`)

但我无法将其传递给 df.select:

scala> df.select(cols.head, cols.tail: _*).show
<console>:34: error: no `: _*' annotation allowed here
(such annotations are only allowed in arguments to *-parameters)
       df.select(cols.head, cols.tail: _*).show

另一次尝试:

scala> df.select(cols.map(col): _*).show
<console>:34: error: type mismatch;
 found   : String => org.apache.spark.sql.Column
 required: org.apache.spark.sql.Column => ?
       df.select(cols.map(col): _*).show

所以这里我们按 a 分组并选择组中所有其他列的最大值:

scala> val df = List((1,2,11), (1,1,1), (2,1,4), (2,3,5)).toDF("a", "b", "c")
df: org.apache.spark.sql.DataFrame = [a: int, b: int ... 1 more field]

scala> val aggCols = df.schema.map(_.name).filter(_ != "a").map(colName => sum(col(colName)).alias(s"max_$colName"))
aggCols: Seq[org.apache.spark.sql.Column] = List(sum(b) AS `max_b`, sum(c) AS `max_c`)

scala> df.groupBy(col("a")).agg(aggCols.head, aggCols.tail: _*)
res0: org.apache.spark.sql.DataFrame = [a: int, max_b: bigint ... 1 more field]

考虑以下方法,将 Window 函数 last(c, ignoreNulls=true) 按 "ord" 按 "grp" 排序应用于每个选定列;然后是 groupBy("grp") 以获取 first agg(colFcnMap) 结果:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val df0 = Seq(
  (1, 3, None, Some(11)),
  (2, 2, Some("aaa"), Some(22)),
  (1, 1, Some("s12"), None),
  (2, 7, None, Some(33)),
  (1, 12, None, None),
  (2, 19, None, Some(77)),
  (1, 10, Some("s13"), None),
  (2, 11, Some("a23"), None)
).toDF("grp", "ord", "col1", "col2")

val df = df0.withColumn("null_col", lit(null))

df.orderBy("grp", "ord").show
// +---+---+----+----+--------+
// |grp|ord|col1|col2|null_col|
// +---+---+----+----+--------+
// |  1|  1| s12|null|    null|
// |  1|  3|null|  11|    null|
// |  1| 10| s13|null|    null|
// |  1| 12|null|null|    null|
// |  2|  2| aaa|  22|    null|
// |  2|  7|null|  33|    null|
// |  2| 11| a23|null|    null|
// |  2| 19|null|  77|    null|
// +---+---+----+----+--------+

val win = Window.partitionBy("grp").orderBy("ord").
  rowsBetween(0, Window.unboundedFollowing)

val nonAggCols = Array("grp")
val cols = df.columns.diff(nonAggCols)  // Columns to be aggregated

val colFcnMap = cols.zip(Array.fill(cols.size)("first")).toMap
// colFcnMap: scala.collection.immutable.Map[String,String] =
//   Map(ord -> first, col1 -> first, col2 -> first, null_col -> first)

cols.foldLeft(df)((acc, c) =>
    acc.withColumn(c, last(c, ignoreNulls=true).over(win))
  ).
  groupBy("grp").agg(colFcnMap).
  select(col("grp") :: colFcnMap.toList.map{case (c, f) => col(s"$f($c)").as(c)}: _*).
  show
// +---+---+----+----+--------+
// |grp|ord|col1|col2|null_col|
// +---+---+----+----+--------+
// |  1| 12| s13|  11|    null|
// |  2| 19| a23|  77|    null|
// +---+---+----+----+--------+

请注意,最后的 select 用于从聚合列名称中删除函数名称(在本例中为 first())。

这是您的答案(希望是我的赏金!!!)

scala> val df = spark.sparkContext.parallelize(List(
     | (1,null.asInstanceOf[String],3,null.asInstanceOf[String],new Integer(11)),
     | (2,null.asInstanceOf[String],2,new String("xxx"),new Integer(22)),
     | (1,null.asInstanceOf[String],1,new String("yyy"),null.asInstanceOf[Integer]),
     | (2,null.asInstanceOf[String],7,null.asInstanceOf[String],new Integer(33)),
     | (1,null.asInstanceOf[String],12,null.asInstanceOf[String],null.asInstanceOf[Integer]),
     | (2,null.asInstanceOf[String],19,null.asInstanceOf[String],new Integer(77)),
     | (1,null.asInstanceOf[String],10,new String("s13"),null.asInstanceOf[Integer]),
     | (2,null.asInstanceOf[String],11,new String("a23"),null.asInstanceOf[Integer]))).toDF("grp","null_col","ord","col1","col2")
df: org.apache.spark.sql.DataFrame = [grp: int, null_col: string ... 3 more fields]

scala> df.show
+---+--------+---+----+----+
|grp|null_col|ord|col1|col2|
+---+--------+---+----+----+
|  1|    null|  3|null|  11|
|  2|    null|  2| xxx|  22|
|  1|    null|  1| yyy|null|
|  2|    null|  7|null|  33|
|  1|    null| 12|null|null|
|  2|    null| 19|null|  77|
|  1|    null| 10| s13|null|
|  2|    null| 11| a23|null|
+---+--------+---+----+----+

//创建window规范

scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window

scala> val win = Window.partitionBy("grp").orderBy($"ord".desc)
win: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@71878833

//在所有列上使用 first over window 规范并采用不同的

scala> val result = df.columns.foldLeft(df)((df, colName) => df.withColumn(colName, first(colName, ignoreNulls=true).over(win).as(colName))).distinct
result: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [grp: int, null_col: string ... 3 more fields]

scala> result.show
+---+--------+---+----+----+
|grp|null_col|ord|col1|col2|
+---+--------+---+----+----+
|  1|    null| 12| s13|  11|
|  2|    null| 19| a23|  77|
+---+--------+---+----+----+

希望对您有所帮助。

我会采用与@LeoC 相同的方法,但我相信没有必要将列名作为字符串进行操作,我会采用更像 spark-sql 的答案。

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, first, last}

val win = Window.partitionBy("grp").orderBy(col("ord")).rowsBetween(0, Window.unboundedFollowing)

// In case there is more than one group column
val nonAggCols = Seq("grp")

// Select columns to aggregate on
val cols: Seq[String] = df.columns.diff(nonAggCols).toSeq

// Map over selection and apply fct
val aggregations: Seq[Column] = cols.map(c => first(col(c), ignoreNulls = true).as(c))

// I'd rather cache the following step as it might get expensive
val step1 = cols.foldLeft(df)((acc, c) => acc.withColumn(c, last(col(c), ignoreNulls = true).over(win))).cache

// Finally we can aggregate our results as followed
val results = step1.groupBy(nonAggCols.head, nonAggCols.tail: _*).agg(aggregations.head, aggregations.tail: _*)

results.show
// +---+--------+---+----+----+
// |grp|null_col|ord|col1|col2|
// +---+--------+---+----+----+
// |  1|    null| 12| s13|  11|
// |  2|    null| 19| a23|  77|
// +---+--------+---+----+----+

希望对您有所帮助。

编辑: 您没有得到相同结果的原因是您使用的 reader 不正确。

它将文件中的 null 解释为字符串而不是 null;即:

scala> df.filter('col1.isNotNull).show
// +---+--------+---+----+----+
// |grp|null_col|ord|col1|col2|
// +---+--------+---+----+----+
// |  1|    null|  3|null|  11|
// |  2|    null|  2| xxx|  22|
// |  1|    null|  1| yyy|null|
// |  2|    null|  7|null|  33|
// |  1|    null| 12|null|null|
// |  2|    null| 19|null|  77|
// |  1|    null| 10| s13|null|
// |  2|    null| 11| a23|null|
// +---+--------+---+----+----+

这是我的 readSparkOutput 版本:

def readSparkOutput(filePath: String): org.apache.spark.sql.DataFrame = {
  val step1 = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", "|")
    .option("parserLib", "UNIVOCITY")
    .option("ignoreLeadingWhiteSpace", "true")
    .option("ignoreTrailingWhiteSpace", "true")
    .option("comment", "+")
    .csv(filePath)

  val step2 = step1.select(step1.columns.filterNot(_.startsWith("_c")).map(step1(_)): _*)

  val columns = step2.columns
  columns.foldLeft(step2)((acc, c) => acc.withColumn(c, when(col(c) =!= "null" or col(c).isNotNull, col(c))))
}

我已经解决了一些问题,这是代码和输出

import org.apache.spark.sql.functions._
import spark.implicits._

val df0 = Seq(
  (1, 3, None, Some(11)),
  (2, 2, Some("aaa"), Some(22)),
  (1, 1, Some("s12"), None),
  (2, 7, None, Some(33)),
  (1, 12, None, None),
  (2, 19, None, Some(77)),
  (1, 10, Some("s13"), None),
  (2, 11, Some("a23"), None)
).toDF("grp", "ord", "col1", "col2")

df0.show()

//+---+---+----+----+
//|grp|ord|col1|col2|
//+---+---+----+----+
//|  1|  3|null|  11|
//|  2|  2| aaa|  22|
//|  1|  1| s12|null|
//|  2|  7|null|  33|
//|  1| 12|null|null|
//|  2| 19|null|  77|
//|  1| 10| s13|null|
//|  2| 11| a23|null|
//+---+---+----+----+

对前 2 列的数据进行排序

val df1 = df0.select("grp", "ord", "col1", "col2").orderBy("grp", "ord")

df1.show()

//+---+---+----+----+
//|grp|ord|col1|col2|
//+---+---+----+----+
//|  1|  1| s12|null|
//|  1|  3|null|  11|
//|  1| 10| s13|null|
//|  1| 12|null|null|
//|  2|  2| aaa|  22|
//|  2|  7|null|  33|
//|  2| 11| a23|null|
//|  2| 19|null|  77|
//+---+---+----+----+

val df2 = df1.groupBy("grp").agg(max("ord").alias("ord"),collect_set("col1").alias("col1"),collect_set("col2").alias("col2"))

val df3 = df2.withColumn("new_col1",$"col1".apply(size($"col1").minus(1))).withColumn("new_col2",$"col2".apply(size($"col2").minus(1)))

df3.show()

//+---+---+----------+------------+--------+--------+
//|grp|ord|      col1|        col2|new_col1|new_col2|
//+---+---+----------+------------+--------+--------+
//|  1| 12|[s12, s13]|        [11]|     s13|      11|
//|  2| 19|[aaa, a23]|[33, 22, 77]|     a23|      77|
//+---+---+----------+------------+--------+--------+

您可以使用 .drop("column_name")

删除不需要的列