哪些操作保留 RDD 顺序?
Which operations preserve RDD order?
RDD 有一个 有意义的(与存储模型强加的一些随机顺序相反)如果它被 sortBy()
, as explained in this .
处理
现在,哪些操作保留那个顺序?
例如,是否保证(在a.sortBy()
之后)
a.map(f).zip(a) ===
a.map(x => (f(x),x))
怎么样
a.filter(f).map(g) ===
a.map(x => (x,g(x))).filter(f(_._1)).map(_._2)
怎么样
a.filter(f).flatMap(g) ===
a.flatMap(x => g(x).map((x,_))).filter(f(_._1)).map(_._2)
这里"equality"===
理解为"functional equivalence",即无法使用用户级操作(即不读取日志&c)来区分结果。
所有操作都保留顺序,但明确不保留的操作除外。排序始终是 "meaningful",而不仅仅是在 sortBy
之后。例如,如果您读取文件 (sc.textFile
),RDD 的行将按照它们在文件中的顺序排列。
虽然没有尝试给出完整的列表,但 map
、filter
和 flatMap
确实保留了顺序。 sortBy
、partitionBy
、join
不保留顺序。
原因是大多数 RDD 操作都在分区内的 Iterator
s 上工作。所以map
或者filter
就是没办法乱序。你可以看看code自己看看。
你现在可能会问:如果我有一个带有 HashPartitioner
的 RDD 怎么办?当我使用 map
更改密钥时会发生什么?好吧,它们会留在原地,现在 RDD 没有按键分区。您可以使用 partitionBy
来恢复带有 shuffle 的分区。
在 Spark 2.0.0+ 中 coalesce
不保证合并期间的分区顺序。 DefaultPartitionCoalescer 有基于分区局部性的优化算法。当分区包含有关其位置的信息时 DefaultPartitionCoalescer
尝试合并同一主机上的分区。并且仅当没有位置信息时,它才简单地根据索引拆分分区并保留分区顺序。
更新:
如果您从文件(如 parquet)加载 DataFrame,Spark 在计划文件拆分时会破坏顺序。如果你使用它,你可以在DataSourceScanExec.scala#L629 or in new Spark 3.x FileScan#L152中看到它。它只是按大小对分区进行排序,小于 spark.sql.files.maxPartitionBytes
的拆分进入最后一个分区。
因此,如果您需要从文件加载排序的数据集,您需要实现自己的 reader。
RDD 有一个 有意义的(与存储模型强加的一些随机顺序相反)如果它被 sortBy()
, as explained in this
现在,哪些操作保留那个顺序?
例如,是否保证(在a.sortBy()
之后)
a.map(f).zip(a) ===
a.map(x => (f(x),x))
怎么样
a.filter(f).map(g) ===
a.map(x => (x,g(x))).filter(f(_._1)).map(_._2)
怎么样
a.filter(f).flatMap(g) ===
a.flatMap(x => g(x).map((x,_))).filter(f(_._1)).map(_._2)
这里"equality"===
理解为"functional equivalence",即无法使用用户级操作(即不读取日志&c)来区分结果。
所有操作都保留顺序,但明确不保留的操作除外。排序始终是 "meaningful",而不仅仅是在 sortBy
之后。例如,如果您读取文件 (sc.textFile
),RDD 的行将按照它们在文件中的顺序排列。
虽然没有尝试给出完整的列表,但 map
、filter
和 flatMap
确实保留了顺序。 sortBy
、partitionBy
、join
不保留顺序。
原因是大多数 RDD 操作都在分区内的 Iterator
s 上工作。所以map
或者filter
就是没办法乱序。你可以看看code自己看看。
你现在可能会问:如果我有一个带有 HashPartitioner
的 RDD 怎么办?当我使用 map
更改密钥时会发生什么?好吧,它们会留在原地,现在 RDD 没有按键分区。您可以使用 partitionBy
来恢复带有 shuffle 的分区。
在 Spark 2.0.0+ 中 coalesce
不保证合并期间的分区顺序。 DefaultPartitionCoalescer 有基于分区局部性的优化算法。当分区包含有关其位置的信息时 DefaultPartitionCoalescer
尝试合并同一主机上的分区。并且仅当没有位置信息时,它才简单地根据索引拆分分区并保留分区顺序。
更新:
如果您从文件(如 parquet)加载 DataFrame,Spark 在计划文件拆分时会破坏顺序。如果你使用它,你可以在DataSourceScanExec.scala#L629 or in new Spark 3.x FileScan#L152中看到它。它只是按大小对分区进行排序,小于 spark.sql.files.maxPartitionBytes
的拆分进入最后一个分区。
因此,如果您需要从文件加载排序的数据集,您需要实现自己的 reader。