什么是 MultilayerPerceptronClassifier - Spark - mllib 中的 maxIter 参数?
¿What is maxIter parameter in MultilayerPerceptronClassifier - Spark - mllib?
¿MultilayerPerceptronClassifier - Spark - mllib 中的 maxIter 是什么?
1. 参数maxIter告诉优化算法为了找到最小误差允许的最大跳数?
或
2.参数maxIter告诉epochs的最大次数(整个数据集经过网络的最大次数)?
class pyspark.ml.classification.MultilayerPerceptronClassifier(featuresCol='features', labelCol='label', predictionCol='prediction', maxIter=100, tol=1e-06, seed=None, layers=None, blockSize=128, stepSize=0.03, solver='l-bfgs', initialWeights=None, probabilityCol='probability', rawPredictionCol='rawPrediction')
Spark 梯度优化器使用 RDD treeAggregate 函数工作。每次迭代它占用 RDD 的一小部分,默认情况下为 1,并将梯度优化操作分配给工作人员,每次迭代占用 整个 RDD。在这种情况下,可以将一次迭代视为一个时期。这种方法使用 Spark 简化了优化过程。还有另一种更高级的深度学习优化器实现,如 BigDL,它允许设置批量大小并使用 BlockManager 为每次迭代计算分布式梯度聚合。在这种情况下,一次迭代对应一次小批量执行。
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
/**
* Aggregates the elements of this RDD in a multi-level tree pattern.
* This method is semantically identical to [[org.apache.spark.rdd.RDD#aggregate]].
*
* @param depth suggested depth of the tree (default: 2)
*/
def treeAggregate[U: ClassTag](zeroValue: U)(
seqOp: (U, T) => U,
combOp: (U, U) => U,
depth: Int = 2): U = withScope {
require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
if (partitions.length == 0) {
Utils.clone(zeroValue, context.env.closureSerializer.newInstance())
} else {
val cleanSeqOp = context.clean(seqOp)
val cleanCombOp = context.clean(combOp)
val aggregatePartition =
(it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
var partiallyAggregated: RDD[U] = mapPartitions(it => Iterator(aggregatePartition(it)))
var numPartitions = partiallyAggregated.partitions.length
val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
// If creating an extra level doesn't help reduce
// the wall-clock time, we stop tree aggregation.
// Don't trigger TreeAggregation when it doesn't save wall-clock time
while (numPartitions > scale + math.ceil(numPartitions.toDouble / scale)) {
numPartitions /= scale
val curNumPartitions = numPartitions
partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
(i, iter) => iter.map((i % curNumPartitions, _))
}.foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values
}
val copiedZeroValue = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
partiallyAggregated.fold(copiedZeroValue)(cleanCombOp)
}
}
¿MultilayerPerceptronClassifier - Spark - mllib 中的 maxIter 是什么?
1. 参数maxIter告诉优化算法为了找到最小误差允许的最大跳数?
或
2.参数maxIter告诉epochs的最大次数(整个数据集经过网络的最大次数)?
class pyspark.ml.classification.MultilayerPerceptronClassifier(featuresCol='features', labelCol='label', predictionCol='prediction', maxIter=100, tol=1e-06, seed=None, layers=None, blockSize=128, stepSize=0.03, solver='l-bfgs', initialWeights=None, probabilityCol='probability', rawPredictionCol='rawPrediction')
Spark 梯度优化器使用 RDD treeAggregate 函数工作。每次迭代它占用 RDD 的一小部分,默认情况下为 1,并将梯度优化操作分配给工作人员,每次迭代占用 整个 RDD。在这种情况下,可以将一次迭代视为一个时期。这种方法使用 Spark 简化了优化过程。还有另一种更高级的深度学习优化器实现,如 BigDL,它允许设置批量大小并使用 BlockManager 为每次迭代计算分布式梯度聚合。在这种情况下,一次迭代对应一次小批量执行。
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
/**
* Aggregates the elements of this RDD in a multi-level tree pattern.
* This method is semantically identical to [[org.apache.spark.rdd.RDD#aggregate]].
*
* @param depth suggested depth of the tree (default: 2)
*/
def treeAggregate[U: ClassTag](zeroValue: U)(
seqOp: (U, T) => U,
combOp: (U, U) => U,
depth: Int = 2): U = withScope {
require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
if (partitions.length == 0) {
Utils.clone(zeroValue, context.env.closureSerializer.newInstance())
} else {
val cleanSeqOp = context.clean(seqOp)
val cleanCombOp = context.clean(combOp)
val aggregatePartition =
(it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
var partiallyAggregated: RDD[U] = mapPartitions(it => Iterator(aggregatePartition(it)))
var numPartitions = partiallyAggregated.partitions.length
val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
// If creating an extra level doesn't help reduce
// the wall-clock time, we stop tree aggregation.
// Don't trigger TreeAggregation when it doesn't save wall-clock time
while (numPartitions > scale + math.ceil(numPartitions.toDouble / scale)) {
numPartitions /= scale
val curNumPartitions = numPartitions
partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
(i, iter) => iter.map((i % curNumPartitions, _))
}.foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values
}
val copiedZeroValue = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
partiallyAggregated.fold(copiedZeroValue)(cleanCombOp)
}
}