哪些操作保留 RDD 顺序?

Which operations preserve RDD order?

RDD 有一个 有意义的(与存储模型强加的一些随机顺序相反)如果它被 sortBy(), as explained in this .

处理

现在,哪些操作保留那个顺序?

例如,是否保证(在a.sortBy()之后)

a.map(f).zip(a) === 
a.map(x => (f(x),x))

怎么样

a.filter(f).map(g) === 
a.map(x => (x,g(x))).filter(f(_._1)).map(_._2)

怎么样

a.filter(f).flatMap(g) === 
a.flatMap(x => g(x).map((x,_))).filter(f(_._1)).map(_._2)

这里"equality"===理解为"functional equivalence",即无法使用用户级操作(即不读取日志&c)来区分结果。

所有操作都保留顺序,但明确不保留的操作除外。排序始终是 "meaningful",而不仅仅是在 sortBy 之后。例如,如果您读取文件 (sc.textFile),RDD 的行将按照它们在文件中的顺序排列。

虽然没有尝试给出完整的列表,但 mapfilterflatMap 确实保留了顺序。 sortBypartitionByjoin 不保留顺序。

原因是大多数 RDD 操作都在分区内的 Iterators 上工作。所以map或者filter就是没办法乱序。你可以看看code自己看看。

你现在可能会问:如果我有一个带有 HashPartitioner 的 RDD 怎么办?当我使用 map 更改密钥时会发生什么?好吧,它们会留在原地,现在 RDD 没有按键分区。您可以使用 partitionBy 来恢复带有 shuffle 的分区。

在 Spark 2.0.0+ 中 coalesce 不保证合并期间的分区顺序。 DefaultPartitionCoalescer 有基于分区局部性的优化算法。当分区包含有关其位置的信息时 DefaultPartitionCoalescer 尝试合并同一主机上的分区。并且仅当没有位置信息时,它才简单地根据索引拆分分区并保留分区顺序。

更新:

如果您从文件(如 parquet)加载 DataFrame,Spark 在计划文件拆分时会破坏顺序。如果你使用它,你可以在DataSourceScanExec.scala#L629 or in new Spark 3.x FileScan#L152中看到它。它只是按大小对分区进行排序,小于 spark.sql.files.maxPartitionBytes 的拆分进入最后一个分区。

因此,如果您需要从文件加载排序的数据集,您需要实现自己的 reader。