drop_duplicate 是否保证在对 spark 中的数据帧进行排序后保留第一行并删除其余行?
Does drop_duplicate guarantee to keep the first row and drop rest of the rows after sorting the dataframe in spark?
我有一个数据框,从 Hadoop 中的 Avro 文件读取,具有三列(a、b、c),其中一列是关键列,在其他两列中,一列是整数类型,另一列是日期类型。
我按整数列和日期列对帧进行排序,然后在结果帧上按键列 (a) 调用 drop_duplicates。
frame = frame.orderBy(["b","c"],ascending=False)
frame = frame.drop_duplicate('a')
根据 Spark Scala 代码,我可以看到 orderBy
在内部调用 sort 方法,该方法进行全局排序。
/**
* Returns a new Dataset sorted by the given expressions. For example:
* {{{
* ds.sort($"col1", $"col2".desc)
* }}}
*
* @group typedrel
* @since 2.0.0
*/
@scala.annotation.varargs
def sort(sortExprs: Column*): Dataset[T] = {
sortInternal(global = true, sortExprs)
}
而且 drop_duplicates(cols) 方法根据波纹管火花代码转换为 Aggregate(first(cols))。
object ReplaceDeduplicateWithAggregate extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUpWithNewOutput {
case d @ Deduplicate(keys, child) if !child.isStreaming =>
val keyExprIds = keys.map(_.exprId)
val aggCols = child.output.map { attr =>
if (keyExprIds.contains(attr.exprId)) {
attr
} else {
Alias(new First(attr).toAggregateExpression(), attr.name)()
}
}
// SPARK-22951: Physical aggregate operators distinguishes global aggregation and grouping
// aggregations by checking the number of grouping keys. The key difference here is that a
// global aggregation always returns at least one row even if there are no input rows. Here
// we append a literal when the grouping key list is empty so that the result aggregate
// operator is properly treated as a grouping aggregation.
val nonemptyKeys = if (keys.isEmpty) Literal(1) :: Nil else keys
val newAgg = Aggregate(nonemptyKeys, aggCols, child)
val attrMapping = d.output.zip(newAgg.output)
newAgg -> attrMapping
}
}
所以我希望删除重复项会在排序后重新训练第一行并删除其他行。但我在我的火花工作中观察到这不是真的。
有什么想法吗?
没有
如果且仅当只有 1 个分区要处理时,按 b 和 c 排序然后按 a 删除,将按您希望的方式工作。对于大数据,情况通常并非如此。
So, as you can google elsewhere: dropDuplicates
retains the first occurrence
of a sort operation - only if there is 1 partition, and
otherwise it is luck.
I.e. non-deterministic for when more partitions in play.
与avro或pyspark无关。此外,按 b、c 排序也可能是不确定的。
我有一个数据框,从 Hadoop 中的 Avro 文件读取,具有三列(a、b、c),其中一列是关键列,在其他两列中,一列是整数类型,另一列是日期类型。
我按整数列和日期列对帧进行排序,然后在结果帧上按键列 (a) 调用 drop_duplicates。
frame = frame.orderBy(["b","c"],ascending=False)
frame = frame.drop_duplicate('a')
根据 Spark Scala 代码,我可以看到 orderBy
在内部调用 sort 方法,该方法进行全局排序。
/**
* Returns a new Dataset sorted by the given expressions. For example:
* {{{
* ds.sort($"col1", $"col2".desc)
* }}}
*
* @group typedrel
* @since 2.0.0
*/
@scala.annotation.varargs
def sort(sortExprs: Column*): Dataset[T] = {
sortInternal(global = true, sortExprs)
}
而且 drop_duplicates(cols) 方法根据波纹管火花代码转换为 Aggregate(first(cols))。
object ReplaceDeduplicateWithAggregate extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUpWithNewOutput {
case d @ Deduplicate(keys, child) if !child.isStreaming =>
val keyExprIds = keys.map(_.exprId)
val aggCols = child.output.map { attr =>
if (keyExprIds.contains(attr.exprId)) {
attr
} else {
Alias(new First(attr).toAggregateExpression(), attr.name)()
}
}
// SPARK-22951: Physical aggregate operators distinguishes global aggregation and grouping
// aggregations by checking the number of grouping keys. The key difference here is that a
// global aggregation always returns at least one row even if there are no input rows. Here
// we append a literal when the grouping key list is empty so that the result aggregate
// operator is properly treated as a grouping aggregation.
val nonemptyKeys = if (keys.isEmpty) Literal(1) :: Nil else keys
val newAgg = Aggregate(nonemptyKeys, aggCols, child)
val attrMapping = d.output.zip(newAgg.output)
newAgg -> attrMapping
}
}
所以我希望删除重复项会在排序后重新训练第一行并删除其他行。但我在我的火花工作中观察到这不是真的。
有什么想法吗?
没有
如果且仅当只有 1 个分区要处理时,按 b 和 c 排序然后按 a 删除,将按您希望的方式工作。对于大数据,情况通常并非如此。
So, as you can google elsewhere:
dropDuplicates
retains thefirst occurrence
of a sort operation - only if there is 1 partition, and otherwise it is luck.I.e. non-deterministic for when more partitions in play.
与avro或pyspark无关。此外,按 b、c 排序也可能是不确定的。