如何将已排序的 RDD 拆分为 n 个部分并从每个部分获取第一个元素?
How to split a sorted RDD into n parts and get first element from each part?
我正在尝试从已排序的 RDD[String] 中提取一些元素。我尝试使用 "zipWithIndex" 并通过余数过滤 RDD 为零。
val expectedSize = 165
val n = rddOfStrings.count / expectedSize
val resultArray = rddOfStrings.sortBy(x => x).zipWithIndex.filter(x => x._2 % n == 0).map(_._1).collect
这里的问题是 "n" 并不总是整数。如果它是双精度数,则 resultArray 的大小将不等于 expectedSize(产生 +1 或 -1)。如何将其设为 return 大小相同的集合?
P.S。我通过将一个集合对象传递给所有执行者来尝试 spark 累加器。由于数据集非常大,它失败了。
这 165 个部分不可能相等,真的 - 假设总大小不是 165 的乘积,其中一些部分必须大于其他部分。
要获取这些部分 "as evenly distributed as possible",您可以使用非舍入 n
,创建 n, 2n, 3n, ...
的 Stream,然后舍入该 Stream 中的每个元素以获取索引您之后的元素,然后使用 contains
:
过滤 RDD
val expectedSize = 165
val n: Double = rddOfStrings.count.toDouble / expectedSize
val indices = Stream.iterate(0D)(x => x + n)
.map(math.round)
.take(expectedSize)
.toList
val resultArray = rddOfStrings.sortBy(x => x)
.zipWithIndex
.filter(x => indices.contains(x._2))
.map(_._1)
.collect
我正在尝试从已排序的 RDD[String] 中提取一些元素。我尝试使用 "zipWithIndex" 并通过余数过滤 RDD 为零。
val expectedSize = 165
val n = rddOfStrings.count / expectedSize
val resultArray = rddOfStrings.sortBy(x => x).zipWithIndex.filter(x => x._2 % n == 0).map(_._1).collect
这里的问题是 "n" 并不总是整数。如果它是双精度数,则 resultArray 的大小将不等于 expectedSize(产生 +1 或 -1)。如何将其设为 return 大小相同的集合?
P.S。我通过将一个集合对象传递给所有执行者来尝试 spark 累加器。由于数据集非常大,它失败了。
这 165 个部分不可能相等,真的 - 假设总大小不是 165 的乘积,其中一些部分必须大于其他部分。
要获取这些部分 "as evenly distributed as possible",您可以使用非舍入 n
,创建 n, 2n, 3n, ...
的 Stream,然后舍入该 Stream 中的每个元素以获取索引您之后的元素,然后使用 contains
:
val expectedSize = 165
val n: Double = rddOfStrings.count.toDouble / expectedSize
val indices = Stream.iterate(0D)(x => x + n)
.map(math.round)
.take(expectedSize)
.toList
val resultArray = rddOfStrings.sortBy(x => x)
.zipWithIndex
.filter(x => indices.contains(x._2))
.map(_._1)
.collect