RDD中是否有任何动作保持秩序?
Is there any action in RDD keeps the order?
我想要一个像 reduce
这样 RDD
性能的动作,但不需要运算符是可交换的。即我希望后面的 result
永远是 "123456789"
.
scala> val rdd = sc.parallelize(1 to 9 map (_.toString))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[24] at parallelize at <console>:24
scala> val result = rdd.someAction{ _+_ }
首先,我找到了fold
。 RDD#fold
的文档说:
def fold(zeroValue: T)(op: (T, T) ⇒ T): T Aggregate the elements of
each partition, and then the results for all the partitions, using a
given associative function and a neutral "zero value"
请注意,文档中不需要 commutative。然而,结果并不如预期:
scala> rdd.fold(""){ _+_ }
res10: String = 312456879
编辑 我已经尝试过@dk14 提到的,但没有成功:
scala> val rdd = sc.parallelize(1 to 9 map (_.toString))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[48] at parallelize at <console>:24
scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res22: String = 341276895
scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res23: String = 914856273
scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res24: String = 742539618
scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res25: String = 271468359
正如@YuvalItzchakov 所指出的那样,fold
在组合结果时不会保留分区 RDD
中的顺序。为了说明这一点,请考虑将原始 RDD
合并为一个分区,
scala> val rdd = sc.parallelize(1 to 9 map (_.toString)).coalesce(1)
rdd: org.apache.spark.rdd.RDD[String] = CoalescedRDD[27] at coalesce at <console>:27
scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res4: String = 123456789
scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res5: String = 123456789
scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res6: String = 123456789
Scala 中没有满足此条件的内置缩减操作,但您可以通过组合 mapPartitions
、collect
和本地缩减来轻松实现自己的操作:
import scala.reflect.ClassTag
def orderedFold[T : ClassTag](rdd: RDD[T])(zero: T)(f: (T, T) => T): T = {
rdd.mapPartitions(iter => Iterator(iter.foldLeft(zero)(f))).collect.reduce(f)
}
使用 collect
和 reduce
的组合进行合并而不是 fold
使用的异步和无序方法可确保保留全局顺序。
这当然需要一些额外费用,包括:
- 驱动程序的内存占用略高。
- 明显更高的延迟 - 我们在开始本地缩减之前明确等待所有任务完成。
我想要一个像 reduce
这样 RDD
性能的动作,但不需要运算符是可交换的。即我希望后面的 result
永远是 "123456789"
.
scala> val rdd = sc.parallelize(1 to 9 map (_.toString))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[24] at parallelize at <console>:24
scala> val result = rdd.someAction{ _+_ }
首先,我找到了fold
。 RDD#fold
的文档说:
def fold(zeroValue: T)(op: (T, T) ⇒ T): T Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral "zero value"
请注意,文档中不需要 commutative。然而,结果并不如预期:
scala> rdd.fold(""){ _+_ }
res10: String = 312456879
编辑 我已经尝试过@dk14 提到的,但没有成功:
scala> val rdd = sc.parallelize(1 to 9 map (_.toString))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[48] at parallelize at <console>:24
scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res22: String = 341276895
scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res23: String = 914856273
scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res24: String = 742539618
scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res25: String = 271468359
正如@YuvalItzchakov 所指出的那样,fold
在组合结果时不会保留分区 RDD
中的顺序。为了说明这一点,请考虑将原始 RDD
合并为一个分区,
scala> val rdd = sc.parallelize(1 to 9 map (_.toString)).coalesce(1)
rdd: org.apache.spark.rdd.RDD[String] = CoalescedRDD[27] at coalesce at <console>:27
scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res4: String = 123456789
scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res5: String = 123456789
scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res6: String = 123456789
Scala 中没有满足此条件的内置缩减操作,但您可以通过组合 mapPartitions
、collect
和本地缩减来轻松实现自己的操作:
import scala.reflect.ClassTag
def orderedFold[T : ClassTag](rdd: RDD[T])(zero: T)(f: (T, T) => T): T = {
rdd.mapPartitions(iter => Iterator(iter.foldLeft(zero)(f))).collect.reduce(f)
}
使用 collect
和 reduce
的组合进行合并而不是 fold
使用的异步和无序方法可确保保留全局顺序。
这当然需要一些额外费用,包括:
- 驱动程序的内存占用略高。
- 明显更高的延迟 - 我们在开始本地缩减之前明确等待所有任务完成。