Spark - sortWithInPartitions 优于排序
Spark - sortWithInPartitions over sort
下面是代表员工 in_date 和 out_date 的示例数据集。
我必须获得所有员工的最后一个in_time。
Spark 运行 在 4 节点独立集群上。
初始数据集:
员工ID-----in_date-----out_date
1111111 2017-04-20 2017-09-14
1111111 2017-11-02 null
2222222 2017-09-26 2017-09-26
2222222 2017-11-28 null
3333333 2016-01-07 2016-01-20
3333333 2017-10-25 null
df.sort(col(in_date).desc())
之后的数据集:
员工ID--in_date-----out_date
1111111 2017-11-02 null
1111111 2017-04-20 2017-09-14
2222222 2017-09-26 2017-09-26
2222222 2017-11-28 null
3333333 2017-10-25 null
3333333 2016-01-07 2016-01-20
df.dropDup(EmployeeID):
输出 :
员工ID-----in_date-----out_date
1111111 2017-11-02 null
2222222 2017-09-26 2017-09-26
3333333 2016-01-07 2016-01-20
预期数据集:
员工ID-----in_date-----out_date
1111111 2017-11-02 null
2222222 2017-11-28 null
3333333 2017-10-25 null
但是当我用 sortWithInPartitions
对初始数据集进行排序并删除重复数据时,我得到了预期的数据集。
我在这里遗漏了什么大事或小事吗?感谢任何帮助。
附加信息:
当 df.sort 在本地模式下使用 Spark 执行时,实现了上述预期输出。
我没有做过任何类型的分区,重新分区。
初始数据集是从底层的Cassandra数据库中获取的。
TL;DR 除非明确保证,否则您永远不应假设 Spark 中的操作将以任何特定顺序执行,尤其是在使用 Spark SQL 时。
您在这里缺少的是随机播放。 dropDuplicates
实现等同于:
df.groupBy(idCols).agg(first(c) for c in nonIdCols)
将执行为:
- 部分("map-side")聚合。
- 随机播放。
- 最终("reduce-side")聚合。
中间洗牌引入了不确定性,并且不能保证最终聚合将以任何特定顺序应用。
The above expected output was achieved when df.sort was executed with Spark in local mode.
local
模式相当简单。你永远不应该用它来得出关于完全分布式模式下 Spark 内部行为的结论。
when I sorted the Initial Dataset with sortWithInPartitions and deduped I got the expected dataset.
如果数据之前按 EmployeeID
分区,这将有意义。在这种情况下,Spark 不需要额外的洗牌。
根据描述,您应该使用 中显示的解决方案之一。
下面是代表员工 in_date 和 out_date 的示例数据集。 我必须获得所有员工的最后一个in_time。
Spark 运行 在 4 节点独立集群上。
初始数据集:
员工ID-----in_date-----out_date
1111111 2017-04-20 2017-09-14
1111111 2017-11-02 null
2222222 2017-09-26 2017-09-26
2222222 2017-11-28 null
3333333 2016-01-07 2016-01-20
3333333 2017-10-25 null
df.sort(col(in_date).desc())
之后的数据集:
员工ID--in_date-----out_date
1111111 2017-11-02 null
1111111 2017-04-20 2017-09-14
2222222 2017-09-26 2017-09-26
2222222 2017-11-28 null
3333333 2017-10-25 null
3333333 2016-01-07 2016-01-20
df.dropDup(EmployeeID):
输出 :
员工ID-----in_date-----out_date
1111111 2017-11-02 null
2222222 2017-09-26 2017-09-26
3333333 2016-01-07 2016-01-20
预期数据集:
员工ID-----in_date-----out_date
1111111 2017-11-02 null
2222222 2017-11-28 null
3333333 2017-10-25 null
但是当我用 sortWithInPartitions
对初始数据集进行排序并删除重复数据时,我得到了预期的数据集。
我在这里遗漏了什么大事或小事吗?感谢任何帮助。
附加信息:
当 df.sort 在本地模式下使用 Spark 执行时,实现了上述预期输出。
我没有做过任何类型的分区,重新分区。
初始数据集是从底层的Cassandra数据库中获取的。
TL;DR 除非明确保证,否则您永远不应假设 Spark 中的操作将以任何特定顺序执行,尤其是在使用 Spark SQL 时。
您在这里缺少的是随机播放。 dropDuplicates
实现等同于:
df.groupBy(idCols).agg(first(c) for c in nonIdCols)
将执行为:
- 部分("map-side")聚合。
- 随机播放。
- 最终("reduce-side")聚合。
中间洗牌引入了不确定性,并且不能保证最终聚合将以任何特定顺序应用。
The above expected output was achieved when df.sort was executed with Spark in local mode.
local
模式相当简单。你永远不应该用它来得出关于完全分布式模式下 Spark 内部行为的结论。
when I sorted the Initial Dataset with sortWithInPartitions and deduped I got the expected dataset.
如果数据之前按 EmployeeID
分区,这将有意义。在这种情况下,Spark 不需要额外的洗牌。
根据描述,您应该使用