如何解读 RDD.treeAggregate
how to interpret RDD.treeAggregate
我运行进入this line在Apache Spark代码源
val (gradientSum, lossSum, miniBatchSize) = data
.sample(false, miniBatchFraction, 42 + i)
.treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
seqOp = (c, v) => {
// c: (grad, loss, count), v: (label, features)
val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
(c._1, c._2 + l, c._3 + 1)
},
combOp = (c1, c2) => {
// c: (grad, loss, count)
(c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
}
)
我在阅读这篇文章时遇到了很多问题:
- 首先,我在网上找不到任何内容可以准确解释
treeAggregate
的工作原理,以及参数的含义。
- 其次,这里
.treeAggregate
好像方法名后面有两个()()。那是什么意思?那是一些我不明白的特殊 Scala 语法吗?
- 最后,我看到 seqOp 和 comboOp return 一个 3 元素的元组,它们与预期的左侧变量匹配,但实际上哪个被 returned?
这句话一定很高级。我无法开始破译这个。
treeAggregate
是 aggregate
的一个特殊实现,它将 combine 函数迭代地应用于分区的一个子集。这样做是为了防止将所有部分结果返回给驱动程序,在驱动程序中,将像经典 aggregate
那样发生单次传递减少。
出于所有实际目的,treeAggregate
遵循与此答案中解释的 aggregate
相同的原则: 除了它需要一个额外的参数来指示深度部分聚合级别。
让我试着具体解释一下这里发生了什么:
对于聚合,我们需要一个零、一个组合函数和一个缩减函数。
aggregate
使用 currying 独立于 combine 和 reduce 函数指定零值。
然后我们可以像这样剖析上面的函数。希望这有助于理解:
val Zero: (BDV, Double, Long) = (BDV.zeros[Double](n), 0.0, 0L)
val combinerFunction: ((BDV, Double, Long), (??, ??)) => (BDV, Double, Long) = (c, v) => {
// c: (grad, loss, count), v: (label, features)
val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
(c._1, c._2 + l, c._3 + 1)
val reducerFunction: ((BDV, Double, Long),(BDV, Double, Long)) => (BDV, Double, Long) = (c1, c2) => {
// c: (grad, loss, count)
(c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
}
然后我们可以以更易于理解的形式重写对 treeAggregate
的调用:
val (gradientSum, lossSum, miniBatchSize) = treeAggregate(Zero)(combinerFunction, reducerFunction)
此表单会将结果元组 'extract' 转换为命名值 gradientSum, lossSum, miniBatchSize
以供进一步使用。
请注意,treeAggregate
采用一个附加参数 depth
,该参数声明为默认值 depth = 2
,因此,由于此特定调用中未提供该参数,因此它将采用该默认值值。
我运行进入this line在Apache Spark代码源
val (gradientSum, lossSum, miniBatchSize) = data
.sample(false, miniBatchFraction, 42 + i)
.treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
seqOp = (c, v) => {
// c: (grad, loss, count), v: (label, features)
val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
(c._1, c._2 + l, c._3 + 1)
},
combOp = (c1, c2) => {
// c: (grad, loss, count)
(c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
}
)
我在阅读这篇文章时遇到了很多问题:
- 首先,我在网上找不到任何内容可以准确解释
treeAggregate
的工作原理,以及参数的含义。 - 其次,这里
.treeAggregate
好像方法名后面有两个()()。那是什么意思?那是一些我不明白的特殊 Scala 语法吗? - 最后,我看到 seqOp 和 comboOp return 一个 3 元素的元组,它们与预期的左侧变量匹配,但实际上哪个被 returned?
这句话一定很高级。我无法开始破译这个。
treeAggregate
是 aggregate
的一个特殊实现,它将 combine 函数迭代地应用于分区的一个子集。这样做是为了防止将所有部分结果返回给驱动程序,在驱动程序中,将像经典 aggregate
那样发生单次传递减少。
出于所有实际目的,treeAggregate
遵循与此答案中解释的 aggregate
相同的原则:
让我试着具体解释一下这里发生了什么:
对于聚合,我们需要一个零、一个组合函数和一个缩减函数。
aggregate
使用 currying 独立于 combine 和 reduce 函数指定零值。
然后我们可以像这样剖析上面的函数。希望这有助于理解:
val Zero: (BDV, Double, Long) = (BDV.zeros[Double](n), 0.0, 0L)
val combinerFunction: ((BDV, Double, Long), (??, ??)) => (BDV, Double, Long) = (c, v) => {
// c: (grad, loss, count), v: (label, features)
val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
(c._1, c._2 + l, c._3 + 1)
val reducerFunction: ((BDV, Double, Long),(BDV, Double, Long)) => (BDV, Double, Long) = (c1, c2) => {
// c: (grad, loss, count)
(c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
}
然后我们可以以更易于理解的形式重写对 treeAggregate
的调用:
val (gradientSum, lossSum, miniBatchSize) = treeAggregate(Zero)(combinerFunction, reducerFunction)
此表单会将结果元组 'extract' 转换为命名值 gradientSum, lossSum, miniBatchSize
以供进一步使用。
请注意,treeAggregate
采用一个附加参数 depth
,该参数声明为默认值 depth = 2
,因此,由于此特定调用中未提供该参数,因此它将采用该默认值值。