按重叠列分区时的高效 spark 数据集操作
Efficient spark dataset operations when partitioned by overlapping columns
我有一个如下所示的数据集("guid"、"timestamp"、"agt")
val df = List(Test("a", "1", null),
Test("b", "2", "4"),
Test("a", "1", "3"),
Test("b", "2", "4"),
Test("c", "1", "3"),
Test("a", "6", "8"),
Test("b", "2", "4"),
Test("a", "1", "4")
我需要计算
- 按 guid 分组时每行的最小时间戳。
- 按(guid、时间戳)分组时每个键的计数
- 行的 agtM 按 guid 分组并按 timestamp(desc) 排序,然后取第一个非空 agt else ""
- 删除重复项
因此输出将如下所示。
+----+---------+---+-------+-----+----+
|guid|timestamp|agt|minimum|count|agtM|
+----+---------+---+-------+-----+----+
| c| 1| 3| 1| 1| 3|
| b| 2| 4| 2| 3| 4|
| a| 1| | 1| 3| 8|
| a| 6| 8| 1| 1| 8|
+----+---------+---+-------+-----+----+
我试过了
val w = Window.partitionBy($"guid")
val w1 = Window.partitionBy($"guid", $"timestamp")
val w2 = Window.partitionBy($"guid").orderBy($"timestamp".desc).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
val gg = df.toDS()
.withColumn("minimum", min("timestamp").over(w))
.withColumn("count", count("*").over(w1))
.withColumn("agtM", coalesce(first($"agt", true).over(w2), lit("")))
.dropDuplicates("guid", "timestamp")
agtM 计算我不是很自信。我的目标是实现最少的改组,因为在这种情况下,我们首先按 guid 分组,然后按(guid,时间戳)分组,逻辑上第二次分组应该发生在第一个创建的分区中。然后输出按 guid 分组并与另一个 table 连接。这两个数据都非常大(以 TB 为单位),所以希望通过最少的改组来实现这一点,并且不想稍后将计算移动到 mapGroups 中(我可以简单地通过使用非空 agenttime 然后 maxBy 过滤组来完成 agtM 计算时间戳)。您能否提出实现上述目标的最佳方法?
编辑
agtM 计算已修复。只是为了为前面的操作提供更多上下文,输出和另一个数据集(一个额外的字段,我们在输出中保持虚拟)的联合将需要按键分组以产生最终结果。我也在考虑计算每个分区(mapPartitions)内的这些值(window w除外),然后将每个分区内的列表作为另一个列表并进行进一步计算。
到back-fillagtM
最后一个non-emptyagt
的值,你可以利用last("agt", ignoreNulls)
和rowsBetween()
的w2
:
val ds = Seq(
("a", "1", ""),
("b", "2", "4"),
("a", "1", "3"),
("b", "2", "4"),
("c", "1", "3"),
("a", "6", "8"),
("b", "2", "4"),
("a", "1", "4")
).toDF("guid", "timestamp", "agt").
as[(String, String, String)]
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy($"guid")
val w1 = Window.partitionBy($"guid", $"timestamp")
val w2 = Window.partitionBy($"guid").orderBy($"timestamp".desc).
rowsBetween(Window.unboundedPreceding, 0)
ds.
withColumn("minimum", min("timestamp").over(w)).
withColumn("count", count("*").over(w1)).
withColumn("agt", when($"agt" =!= "", $"agt")).
withColumn("agtM", last("agt", ignoreNulls = true).over(w2)).
na.fill("", Seq("agt")).
dropDuplicates("guid", "timestamp").
show
// +----+---------+---+-------+-----+----+
// |guid|timestamp|agt|minimum|count|agtM|
// +----+---------+---+-------+-----+----+
// | c| 1| 3| 1| 1| 3|
// | b| 2| 4| 2| 3| 4|
// | a| 1| | 1| 3| 8|
// | a| 6| 8| 1| 1| 8|
// +----+---------+---+-------+-----+----+
鉴于您的每个 window 规范 w
、w1
和 w2
都有自己的特定要求,我不确定可以做多少来减少洗牌。您可以探索 non-window 方法,尽管您打算创建的结果数据集似乎很适合使用 window 函数。
I need to compute
the minimum timestamp for each row when grouped by guid.
The count for each key when grouped by (guid, timestamp)
The agtM of row when grouped by guid and ordered by timestamp(desc) and then take first non empty agt else ""
根据您的要求,您需要计算 guid 组上 agt 的最小时间戳、agtM(最新),并在按 guid 和时间戳分组时进行计数。这些要求表明您需要 三个分组和三个洗牌。
第一次分组打乱-求个数
val dfWithCount = df
.groupBy("guid", "timestamp")
.agg(count("guid").as("count"))
第二次和第三次分组打乱
latest agt 即 agtM 可以通过使用 Window
函数找到并且 minimumn timestamp 可以通过使用另一个 groupBy
和 aggregation
val dfWithMinAndMax = df.withColumn("agtM", first("agt").over(windowSpec))
.groupBy("guid", "agtM")
.agg(min("timestamp").as("minimum")
)
最后 join
两个 数据帧
val finalDF = dfWithCount.join(dfWithMinAndMax, Seq("guid"))
这会给你 正确的数据帧 但没有 agt
+----+---------+-----+----+-------+
|guid|timestamp|count|agtM|minimum|
+----+---------+-----+----+-------+
|c |1 |1 |3 |1 |
|b |2 |3 |4 |2 |
|a |1 |3 |8 |1 |
|a |6 |1 |8 |1 |
+----+---------+-----+----+-------+
我想 agt
没那么重要,但如果你真的需要它,那么你需要另一个 grouping 和 shuffling和 加入
val dfWithAgt = df.groupBy("guid", "timestamp").agg(min("agt").as("agt"))
finalDF.join(dfWithAgt, Seq("guid", "timestamp"))
这会给你
+----+---------+-----+----+-------+---+
|guid|timestamp|count|agtM|minimum|agt|
+----+---------+-----+----+-------+---+
|c |1 |1 |3 |1 |3 |
|b |2 |3 |4 |2 |4 |
|a |1 |3 |8 |1 | |
|a |6 |1 |8 |1 |8 |
+----+---------+-----+----+-------+---+
可以使用 select
完成列顺序。
希望回答对你有帮助
最初通过 guid 对其进行分区,然后使用迭代器从逻辑上讲会减少改组。如果每组里面的数据很大,不知道效果如何。
df.toDS().groupByKey(_.guid).flatMapGroups((a,b) => {
val list = b.toList
val minimum = list.minBy(_.timestamp).timestamp
val filteredList = list.filterNot(_.agt == "")
val agtM = if(filteredList.isEmpty) "" else filteredList.maxBy(_.timestamp).agt
list.groupBy(_.timestamp).map(r => (r._2.head.guid, r._1, r._2.head.agt, minimum, r._2.length, agtM))
}).select($"_1".as("guid"), $"_2".as("timestamp"),
$"_3".as("agt"), $"_4".as("minimum"), $"_5".as("count"), $"_6".as("agtM")).show()
我有一个如下所示的数据集("guid"、"timestamp"、"agt")
val df = List(Test("a", "1", null),
Test("b", "2", "4"),
Test("a", "1", "3"),
Test("b", "2", "4"),
Test("c", "1", "3"),
Test("a", "6", "8"),
Test("b", "2", "4"),
Test("a", "1", "4")
我需要计算
- 按 guid 分组时每行的最小时间戳。
- 按(guid、时间戳)分组时每个键的计数
- 行的 agtM 按 guid 分组并按 timestamp(desc) 排序,然后取第一个非空 agt else ""
- 删除重复项
因此输出将如下所示。
+----+---------+---+-------+-----+----+
|guid|timestamp|agt|minimum|count|agtM|
+----+---------+---+-------+-----+----+
| c| 1| 3| 1| 1| 3|
| b| 2| 4| 2| 3| 4|
| a| 1| | 1| 3| 8|
| a| 6| 8| 1| 1| 8|
+----+---------+---+-------+-----+----+
我试过了
val w = Window.partitionBy($"guid")
val w1 = Window.partitionBy($"guid", $"timestamp")
val w2 = Window.partitionBy($"guid").orderBy($"timestamp".desc).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
val gg = df.toDS()
.withColumn("minimum", min("timestamp").over(w))
.withColumn("count", count("*").over(w1))
.withColumn("agtM", coalesce(first($"agt", true).over(w2), lit("")))
.dropDuplicates("guid", "timestamp")
agtM 计算我不是很自信。我的目标是实现最少的改组,因为在这种情况下,我们首先按 guid 分组,然后按(guid,时间戳)分组,逻辑上第二次分组应该发生在第一个创建的分区中。然后输出按 guid 分组并与另一个 table 连接。这两个数据都非常大(以 TB 为单位),所以希望通过最少的改组来实现这一点,并且不想稍后将计算移动到 mapGroups 中(我可以简单地通过使用非空 agenttime 然后 maxBy 过滤组来完成 agtM 计算时间戳)。您能否提出实现上述目标的最佳方法?
编辑
agtM 计算已修复。只是为了为前面的操作提供更多上下文,输出和另一个数据集(一个额外的字段,我们在输出中保持虚拟)的联合将需要按键分组以产生最终结果。我也在考虑计算每个分区(mapPartitions)内的这些值(window w除外),然后将每个分区内的列表作为另一个列表并进行进一步计算。
到back-fillagtM
最后一个non-emptyagt
的值,你可以利用last("agt", ignoreNulls)
和rowsBetween()
的w2
:
val ds = Seq(
("a", "1", ""),
("b", "2", "4"),
("a", "1", "3"),
("b", "2", "4"),
("c", "1", "3"),
("a", "6", "8"),
("b", "2", "4"),
("a", "1", "4")
).toDF("guid", "timestamp", "agt").
as[(String, String, String)]
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy($"guid")
val w1 = Window.partitionBy($"guid", $"timestamp")
val w2 = Window.partitionBy($"guid").orderBy($"timestamp".desc).
rowsBetween(Window.unboundedPreceding, 0)
ds.
withColumn("minimum", min("timestamp").over(w)).
withColumn("count", count("*").over(w1)).
withColumn("agt", when($"agt" =!= "", $"agt")).
withColumn("agtM", last("agt", ignoreNulls = true).over(w2)).
na.fill("", Seq("agt")).
dropDuplicates("guid", "timestamp").
show
// +----+---------+---+-------+-----+----+
// |guid|timestamp|agt|minimum|count|agtM|
// +----+---------+---+-------+-----+----+
// | c| 1| 3| 1| 1| 3|
// | b| 2| 4| 2| 3| 4|
// | a| 1| | 1| 3| 8|
// | a| 6| 8| 1| 1| 8|
// +----+---------+---+-------+-----+----+
鉴于您的每个 window 规范 w
、w1
和 w2
都有自己的特定要求,我不确定可以做多少来减少洗牌。您可以探索 non-window 方法,尽管您打算创建的结果数据集似乎很适合使用 window 函数。
I need to compute
the minimum timestamp for each row when grouped by guid.
The count for each key when grouped by (guid, timestamp)
The agtM of row when grouped by guid and ordered by timestamp(desc) and then take first non empty agt else ""
根据您的要求,您需要计算 guid 组上 agt 的最小时间戳、agtM(最新),并在按 guid 和时间戳分组时进行计数。这些要求表明您需要 三个分组和三个洗牌。
第一次分组打乱-求个数
val dfWithCount = df
.groupBy("guid", "timestamp")
.agg(count("guid").as("count"))
第二次和第三次分组打乱
latest agt 即 agtM 可以通过使用 Window
函数找到并且 minimumn timestamp 可以通过使用另一个 groupBy
和 aggregation
val dfWithMinAndMax = df.withColumn("agtM", first("agt").over(windowSpec))
.groupBy("guid", "agtM")
.agg(min("timestamp").as("minimum")
)
最后 join
两个 数据帧
val finalDF = dfWithCount.join(dfWithMinAndMax, Seq("guid"))
这会给你 正确的数据帧 但没有 agt
+----+---------+-----+----+-------+
|guid|timestamp|count|agtM|minimum|
+----+---------+-----+----+-------+
|c |1 |1 |3 |1 |
|b |2 |3 |4 |2 |
|a |1 |3 |8 |1 |
|a |6 |1 |8 |1 |
+----+---------+-----+----+-------+
我想 agt
没那么重要,但如果你真的需要它,那么你需要另一个 grouping 和 shuffling和 加入
val dfWithAgt = df.groupBy("guid", "timestamp").agg(min("agt").as("agt"))
finalDF.join(dfWithAgt, Seq("guid", "timestamp"))
这会给你
+----+---------+-----+----+-------+---+
|guid|timestamp|count|agtM|minimum|agt|
+----+---------+-----+----+-------+---+
|c |1 |1 |3 |1 |3 |
|b |2 |3 |4 |2 |4 |
|a |1 |3 |8 |1 | |
|a |6 |1 |8 |1 |8 |
+----+---------+-----+----+-------+---+
可以使用 select
完成列顺序。
希望回答对你有帮助
最初通过 guid 对其进行分区,然后使用迭代器从逻辑上讲会减少改组。如果每组里面的数据很大,不知道效果如何。
df.toDS().groupByKey(_.guid).flatMapGroups((a,b) => {
val list = b.toList
val minimum = list.minBy(_.timestamp).timestamp
val filteredList = list.filterNot(_.agt == "")
val agtM = if(filteredList.isEmpty) "" else filteredList.maxBy(_.timestamp).agt
list.groupBy(_.timestamp).map(r => (r._2.head.guid, r._1, r._2.head.agt, minimum, r._2.length, agtM))
}).select($"_1".as("guid"), $"_2".as("timestamp"),
$"_3".as("agt"), $"_4".as("minimum"), $"_5".as("count"), $"_6".as("agtM")).show()