反向迭代RDD
Reverse iteraror RDD
我有以下类型的纯 Scala 代码:
import breeze.numerics.log
import spire.random.Dist
import org.apache.commons.math3.distribution.NormalDistribution
import scala.collection.mutable.Buffer
def foo1(zs: Buffer[Double])={
val S = zs.zip(zs.reverse)
.map { case (x, y) =>log(x) * log(1 - y) }.sum
S
}
val x = Dist.uniform(0.0, 1.0).sample[Buffer](10)
val y = x.sortWith(_<_)
val cdf=new NormalDistribution(0, 1)
val z = y.map(x_ => cdf.cumulativeProbability(x_))
foo1(z)
z
已排序,因为 cdf
递增
我想为Spark重写它,但是对于RDD数据类型没有反向方法。如何为 Spark 编写此代码?
def foo2(z_rdd: RDD[Double])={
var S = z_rdd.zip(z_rdd.???)
.map { case (x, y) =>log(x) * log(1 - y) }.sum
S
}
其中 ???
反转的函数 z_rdd
。
您可以使用 zipWithIndex
为 RDD 的值添加索引,然后按索引反向排序:
z_rdd.zip(
z_rdd.zipWithIndex()
.sortBy(_._2, ascending = false)
).map({ case (doubleA, (doubleB, _)) =>
…
})
如果您尝试使用其自身的反向副本压缩 RDD,您应该记住 Spark zip 要求两个 RDD 均等分区:
Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition (e.g. one was made through a map on the other).
因此,完成rdd zip rdd.reversed
的方法是:
- 将
zipWithIndex
应用于RDD,正如已经建议的那样
- 以相反的顺序对其进行排序,并将生成的 RDD 与索引一起压缩
reduceByKey
或 groupByKey
来自步骤 1 和 2 的 RDD 的并集,以索引为键
我不确定这个食谱是否可以改进。
我有以下类型的纯 Scala 代码:
import breeze.numerics.log
import spire.random.Dist
import org.apache.commons.math3.distribution.NormalDistribution
import scala.collection.mutable.Buffer
def foo1(zs: Buffer[Double])={
val S = zs.zip(zs.reverse)
.map { case (x, y) =>log(x) * log(1 - y) }.sum
S
}
val x = Dist.uniform(0.0, 1.0).sample[Buffer](10)
val y = x.sortWith(_<_)
val cdf=new NormalDistribution(0, 1)
val z = y.map(x_ => cdf.cumulativeProbability(x_))
foo1(z)
z
已排序,因为 cdf
递增
我想为Spark重写它,但是对于RDD数据类型没有反向方法。如何为 Spark 编写此代码?
def foo2(z_rdd: RDD[Double])={
var S = z_rdd.zip(z_rdd.???)
.map { case (x, y) =>log(x) * log(1 - y) }.sum
S
}
其中 ???
反转的函数 z_rdd
。
您可以使用 zipWithIndex
为 RDD 的值添加索引,然后按索引反向排序:
z_rdd.zip(
z_rdd.zipWithIndex()
.sortBy(_._2, ascending = false)
).map({ case (doubleA, (doubleB, _)) =>
…
})
如果您尝试使用其自身的反向副本压缩 RDD,您应该记住 Spark zip 要求两个 RDD 均等分区:
Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition (e.g. one was made through a map on the other).
因此,完成rdd zip rdd.reversed
的方法是:
- 将
zipWithIndex
应用于RDD,正如已经建议的那样 - 以相反的顺序对其进行排序,并将生成的 RDD 与索引一起压缩
reduceByKey
或groupByKey
来自步骤 1 和 2 的 RDD 的并集,以索引为键
我不确定这个食谱是否可以改进。