按重叠列分区时的高效 spark 数据集操作

Efficient spark dataset operations when partitioned by overlapping columns

我有一个如下所示的数据集("guid"、"timestamp"、"agt")

val df = List(Test("a", "1", null),
   Test("b", "2", "4"),
   Test("a", "1", "3"),
   Test("b", "2", "4"),
   Test("c", "1", "3"),
   Test("a", "6", "8"),
   Test("b", "2", "4"),
   Test("a", "1", "4")

我需要计算

因此输出将如下所示。

+----+---------+---+-------+-----+----+
|guid|timestamp|agt|minimum|count|agtM|
+----+---------+---+-------+-----+----+
|   c|        1|  3|      1|    1|   3|
|   b|        2|  4|      2|    3|   4|
|   a|        1|   |      1|    3|   8|
|   a|        6|  8|      1|    1|   8|
+----+---------+---+-------+-----+----+

我试过了

val w = Window.partitionBy($"guid")

    val w1 = Window.partitionBy($"guid", $"timestamp")
    val w2 = Window.partitionBy($"guid").orderBy($"timestamp".desc).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

    val gg = df.toDS()
      .withColumn("minimum", min("timestamp").over(w))
      .withColumn("count", count("*").over(w1))
      .withColumn("agtM", coalesce(first($"agt", true).over(w2), lit("")))
      .dropDuplicates("guid", "timestamp")

agtM 计算我不是很自信。我的目标是实现最少的改组,因为在这种情况下,我们首先按 guid 分组,然后按(guid,时间戳)分组,逻辑上第二次分组应该发生在第一个创建的分区中。然后输出按 guid 分组并与另一个 table 连接。这两个数据都非常大(以 TB 为单位),所以希望通过最少的改组来实现这一点,并且不想稍后将计算移动到 mapGroups 中(我可以简单地通过使用非空 agenttime 然后 maxBy 过滤组来完成 agtM 计算时间戳)。您能否提出实现上述目标的最佳方法?

编辑

agtM 计算已修复。只是为了为前面的操作提供更多上下文,输出和另一个数据集(一个额外的字段,我们在输出中保持虚拟)的联合将需要按键分组以产生最终结果。我也在考虑计算每个分区(mapPartitions)内的这些值(window w除外),然后将每个分区内的列表作为另一个列表并进行进一步计算。

到back-fillagtM最后一个non-emptyagt的值,你可以利用last("agt", ignoreNulls)rowsBetween()w2:

val ds = Seq(
  ("a", "1", ""),
  ("b", "2", "4"),
  ("a", "1", "3"),
  ("b", "2", "4"),
  ("c", "1", "3"),
  ("a", "6", "8"),
  ("b", "2", "4"),
  ("a", "1", "4")
).toDF("guid", "timestamp", "agt").
  as[(String, String, String)]

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy($"guid")
val w1 = Window.partitionBy($"guid", $"timestamp")
val w2 = Window.partitionBy($"guid").orderBy($"timestamp".desc).
  rowsBetween(Window.unboundedPreceding, 0)

ds.
  withColumn("minimum", min("timestamp").over(w)).
  withColumn("count", count("*").over(w1)).
  withColumn("agt", when($"agt" =!= "", $"agt")).
  withColumn("agtM", last("agt", ignoreNulls = true).over(w2)).
  na.fill("", Seq("agt")).
  dropDuplicates("guid", "timestamp").
  show
// +----+---------+---+-------+-----+----+
// |guid|timestamp|agt|minimum|count|agtM|
// +----+---------+---+-------+-----+----+
// |   c|        1|  3|      1|    1|   3|
// |   b|        2|  4|      2|    3|   4|
// |   a|        1|   |      1|    3|   8|
// |   a|        6|  8|      1|    1|   8|
// +----+---------+---+-------+-----+----+

鉴于您的每个 window 规范 ww1w2 都有自己的特定要求,我不确定可以做多少来减少洗牌。您可以探索 non-window 方法,尽管您打算创建的结果数据集似乎很适合使用 window 函数。

I need to compute
the minimum timestamp for each row when grouped by guid.
The count for each key when grouped by (guid, timestamp)
The agtM of row when grouped by guid and ordered by timestamp(desc) and then take first non empty agt else ""

根据您的要求,您需要计算 guid 组上 agt 的最小时间戳、agtM(最新),并在按 guid 和时间戳分组时进行计数。这些要求表明您需要 三个分组和三个洗牌

第一次分组打乱-求个数

val dfWithCount = df
      .groupBy("guid", "timestamp")
      .agg(count("guid").as("count"))

第二次和第三次分组打乱

latest agt 即 agtM 可以通过使用 Window 函数找到并且 minimumn timestamp 可以通过使用另一个 groupByaggregation

val dfWithMinAndMax = df.withColumn("agtM", first("agt").over(windowSpec))
      .groupBy("guid", "agtM")
      .agg(min("timestamp").as("minimum")
      )

最后 join 两个 数据帧

val finalDF = dfWithCount.join(dfWithMinAndMax, Seq("guid"))

这会给你 正确的数据帧 但没有 agt

+----+---------+-----+----+-------+
|guid|timestamp|count|agtM|minimum|
+----+---------+-----+----+-------+
|c   |1        |1    |3   |1      |
|b   |2        |3    |4   |2      |
|a   |1        |3    |8   |1      |
|a   |6        |1    |8   |1      |
+----+---------+-----+----+-------+

我想 agt 没那么重要,但如果你真的需要它,那么你需要另一个 groupingshuffling加入

val dfWithAgt = df.groupBy("guid", "timestamp").agg(min("agt").as("agt"))

finalDF.join(dfWithAgt, Seq("guid", "timestamp"))

这会给你

+----+---------+-----+----+-------+---+
|guid|timestamp|count|agtM|minimum|agt|
+----+---------+-----+----+-------+---+
|c   |1        |1    |3   |1      |3  |
|b   |2        |3    |4   |2      |4  |
|a   |1        |3    |8   |1      |   |
|a   |6        |1    |8   |1      |8  |
+----+---------+-----+----+-------+---+

可以使用 select 完成列顺序。

希望回答对你有帮助

最初通过 guid 对其进行分区,然后使用迭代器从逻辑上讲会减少改组。如果每组里面的数据很大,不知道效果如何。

df.toDS().groupByKey(_.guid).flatMapGroups((a,b) => {
          val list = b.toList
          val minimum = list.minBy(_.timestamp).timestamp
          val filteredList = list.filterNot(_.agt == "")
          val agtM = if(filteredList.isEmpty) "" else filteredList.maxBy(_.timestamp).agt
          list.groupBy(_.timestamp).map(r => (r._2.head.guid, r._1, r._2.head.agt, minimum, r._2.length, agtM))
        }).select($"_1".as("guid"), $"_2".as("timestamp"),
          $"_3".as("agt"), $"_4".as("minimum"), $"_5".as("count"), $"_6".as("agtM")).show()