在 Scala 中聚合 Spark 数据集的问题
Issues Aggregating Spark Datasets in Scala
我正在使用 Scala 的 /: 运算符计算一系列数据集聚合。下面列出了聚合代码:
def execute1(
xy: DATASET,
f: Double => Double): Double = {
println("PRINTING: The data points being evaluated: " + xy)
println("PRINTING: Running execute1")
var z = xy.filter{ case(x, y) => abs(y) > EPS}
var ret = - z./:(0.0) { case(s, (x, y)) => {
var px = f(x)
s + px*log(px/y)}
}
ret
}
当我尝试 运行 为作为 f 参数传入的单独函数列表设置块时,出现了我的问题。函数列表是:
lazy val pdfs = Map[Int, Double => Double](
1 -> betaScaled,
2 -> gammaScaled,
3 -> logNormal,
4 -> uniform,
5 -> chiSquaredScaled
)
运行通过列表聚合的执行函数是:
def execute2(
xy: DATASET,
fs: Iterable[Double=>Double]): Iterable[Double] = {
fs.map(execute1(xy, _))
}
最后的执行块:
val kl_rdd = master_ds.mapPartitions((it:DATASET) => {
val pdfsList = pdfs_broadcast.value.map(
n => pdfs.get(n).get
)
execute2(it, pdfsList).iterator
问题是,虽然确实发生了聚合,但它们似乎都聚合在输出数组的第一个槽中,而我希望单独显示每个函数的聚合。我 运行 测试以确认所有五个函数实际上都是 运行,并且它们正在第一个槽中求和。
The pre-divergence value: -4.999635700491883
The pre-divergence value: -0.0
The pre-divergence value: -0.0
The pre-divergence value: -0.0
The pre-divergence value: -0.0
这是我 运行 遇到过的最难的问题之一,所以任何方向都将不胜感激。将给予应有的信用。谢谢!
Spark 的数据集没有 foldLeft
(又名 /:
):https://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.sql.Dataset 并且实际上需要类型参数 DataSet[T]
并且其名称并非全部大写。
所以,我想你的 DATASET
的类型是一个迭代器,所以它在 execute1
的第一个 运行 之后被耗尽,所以每个后续的 execute1
都会变空迭代器。基本上,它不会聚合所有函数 - 它只执行第一个并忽略其他函数(你得到 -0.0 因为你将 0.0 作为初始值传递给 foldLeft)。
从mapPartitions
签名可以看出:
def mapPartitions[U](f: (Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
它为您提供了一个迭代器(只能遍历一次的可变结构),因此您应该执行 it.toList
以获得(可能但有限的大)不可变结构 (List
)。
P.S。如果您想真正使用 Spark 的 DataSet/RDD - 使用 aggregate
(RDD) 或 agg
(DataSet)。另见:
关于迭代器的解释:
scala> val it = List(1,2,3).toIterator
it: Iterator[Int] = non-empty iterator
scala> it.toList //traverse iterator and accumulate its data into List
res0: List[Int] = List(1, 2, 3)
scala> it.toList //iterator is drained, so second call doesn't traverse anything
res1: List[Int] = List()
我正在使用 Scala 的 /: 运算符计算一系列数据集聚合。下面列出了聚合代码:
def execute1(
xy: DATASET,
f: Double => Double): Double = {
println("PRINTING: The data points being evaluated: " + xy)
println("PRINTING: Running execute1")
var z = xy.filter{ case(x, y) => abs(y) > EPS}
var ret = - z./:(0.0) { case(s, (x, y)) => {
var px = f(x)
s + px*log(px/y)}
}
ret
}
当我尝试 运行 为作为 f 参数传入的单独函数列表设置块时,出现了我的问题。函数列表是:
lazy val pdfs = Map[Int, Double => Double](
1 -> betaScaled,
2 -> gammaScaled,
3 -> logNormal,
4 -> uniform,
5 -> chiSquaredScaled
)
运行通过列表聚合的执行函数是:
def execute2(
xy: DATASET,
fs: Iterable[Double=>Double]): Iterable[Double] = {
fs.map(execute1(xy, _))
}
最后的执行块:
val kl_rdd = master_ds.mapPartitions((it:DATASET) => {
val pdfsList = pdfs_broadcast.value.map(
n => pdfs.get(n).get
)
execute2(it, pdfsList).iterator
问题是,虽然确实发生了聚合,但它们似乎都聚合在输出数组的第一个槽中,而我希望单独显示每个函数的聚合。我 运行 测试以确认所有五个函数实际上都是 运行,并且它们正在第一个槽中求和。
The pre-divergence value: -4.999635700491883
The pre-divergence value: -0.0
The pre-divergence value: -0.0
The pre-divergence value: -0.0
The pre-divergence value: -0.0
这是我 运行 遇到过的最难的问题之一,所以任何方向都将不胜感激。将给予应有的信用。谢谢!
Spark 的数据集没有 foldLeft
(又名 /:
):https://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.sql.Dataset 并且实际上需要类型参数 DataSet[T]
并且其名称并非全部大写。
所以,我想你的 DATASET
的类型是一个迭代器,所以它在 execute1
的第一个 运行 之后被耗尽,所以每个后续的 execute1
都会变空迭代器。基本上,它不会聚合所有函数 - 它只执行第一个并忽略其他函数(你得到 -0.0 因为你将 0.0 作为初始值传递给 foldLeft)。
从mapPartitions
签名可以看出:
def mapPartitions[U](f: (Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
它为您提供了一个迭代器(只能遍历一次的可变结构),因此您应该执行 it.toList
以获得(可能但有限的大)不可变结构 (List
)。
P.S。如果您想真正使用 Spark 的 DataSet/RDD - 使用 aggregate
(RDD) 或 agg
(DataSet)。另见:
关于迭代器的解释:
scala> val it = List(1,2,3).toIterator
it: Iterator[Int] = non-empty iterator
scala> it.toList //traverse iterator and accumulate its data into List
res0: List[Int] = List(1, 2, 3)
scala> it.toList //iterator is drained, so second call doesn't traverse anything
res1: List[Int] = List()