Spark - sortWithInPartitions 优于排序

Spark - sortWithInPartitions over sort

下面是代表员工 in_date 和 out_date 的示例数据集。 我必须获得所有员工的最后一个in_time。

Spark 运行 在 4 节点独立集群上。

初始数据集:

员工ID-----in_date-----out_date

1111111     2017-04-20  2017-09-14 
1111111     2017-11-02  null 
2222222     2017-09-26  2017-09-26 
2222222     2017-11-28  null 
3333333     2016-01-07  2016-01-20 
3333333     2017-10-25  null 

df.sort(col(in_date).desc())之后的数据集:

员工ID--in_date-----out_date

1111111   2017-11-02   null 
1111111   2017-04-20   2017-09-14 
2222222   2017-09-26   2017-09-26 
2222222   2017-11-28   null 
3333333   2017-10-25   null 
3333333   2016-01-07   2016-01-20 
df.dropDup(EmployeeID):  

输出 :

员工ID-----in_date-----out_date

1111111    2017-11-02    null 
2222222    2017-09-26    2017-09-26 
3333333    2016-01-07    2016-01-20 

预期数据集:

员工ID-----in_date-----out_date

1111111    2017-11-02   null 
2222222    2017-11-28   null 
3333333    2017-10-25   null 

但是当我用 sortWithInPartitions 对初始数据集进行排序并删除重复数据时,我得到了预期的数据集。 我在这里遗漏了什么大事或小事吗?感谢任何帮助。

附加信息: 当 df.sort 在本地模式下使用 Spark 执行时,实现了上述预期输出。
我没有做过任何类型的分区,重新分区。 初始数据集是从底层的Cassandra数据库中获取的。

TL;DR 除非明确保证,否则您永远不应假设 Spark 中的操作将以任何特定顺序执行,尤其是在使用 Spark SQL 时。

您在这里缺少的是随机播放。 dropDuplicates 实现等同于:

df.groupBy(idCols).agg(first(c) for c in nonIdCols)

将执行为:

  • 部分("map-side")聚合。
  • 随机播放。
  • 最终("reduce-side")聚合。

中间洗牌引入了不确定性,并且不能保证最终聚合将以任何特定顺序应用。

The above expected output was achieved when df.sort was executed with Spark in local mode.

local 模式相当简单。你永远不应该用它来得出关于完全分布式模式下 Spark 内部行为的结论。

when I sorted the Initial Dataset with sortWithInPartitions and deduped I got the expected dataset.

如果数据之前按 EmployeeID 分区,这将有意义。在这种情况下,Spark 不需要额外的洗牌。

根据描述,您应该使用 中显示的解决方案之一。