反向迭代RDD

Reverse iteraror RDD

我有以下类型的纯 Scala 代码:

import breeze.numerics.log
import spire.random.Dist
import org.apache.commons.math3.distribution.NormalDistribution
import scala.collection.mutable.Buffer


def foo1(zs: Buffer[Double])={
  val S = zs.zip(zs.reverse)
    .map { case (x, y) =>log(x) * log(1 - y) }.sum
  S
}

val x = Dist.uniform(0.0, 1.0).sample[Buffer](10)
val y = x.sortWith(_<_)
val cdf=new NormalDistribution(0, 1)
val z = y.map(x_ => cdf.cumulativeProbability(x_))

foo1(z)

z 已排序,因为 cdf 递增

我想为Spark重写它,但是对于RDD数据类型没有反向方法。如何为 Spark 编写此代码?

def foo2(z_rdd: RDD[Double])={
    var S = z_rdd.zip(z_rdd.???)
    .map { case (x, y) =>log(x) * log(1 - y) }.sum
    S
}

其中 ???反转的函数 z_rdd

您可以使用 zipWithIndex 为 RDD 的值添加索引,然后按索引反向排序:

z_rdd.zip(
  z_rdd.zipWithIndex()
    .sortBy(_._2, ascending = false)
).map({ case (doubleA, (doubleB, _)) =>
  …
})

如果您尝试使用其自身的反向副本压缩 RDD,您应该记住 Spark zip 要求两个 RDD 均等分区:

http://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html#zip-org.apache.spark.rdd.RDD-scala.reflect.ClassTag-

Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition (e.g. one was made through a map on the other).

因此,完成rdd zip rdd.reversed的方法是:

  1. zipWithIndex应用于RDD,正如已经建议的那样
  2. 以相反的顺序对其进行排序,并将生成的 RDD 与索引一起压缩
  3. reduceByKeygroupByKey 来自步骤 1 和 2 的 RDD 的并集,以索引为键

我不确定这个食谱是否可以改进。