Spark:汇总统计
Spark: Summary statistics
我正在尝试使用 Spark 汇总统计信息,如下所述:https://spark.apache.org/docs/1.1.0/mllib-statistics.html
根据 Spark 文档:
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.mllib.linalg.DenseVector
val observations: RDD[Vector] = ... // an RDD of Vectors
// Compute column summary statistics.
val summary: MultivariateStatisticalSummary = Statistics.colStats(observations)
我在构建 observations:RDD[Vector]
对象时遇到问题。我试试:
scala> val data:Array[Double] = Array(1, 2, 3, 4, 5)
data: Array[Double] = Array(1.0, 2.0, 3.0, 4.0, 5.0)
scala> val v = new DenseVector(data)
v: org.apache.spark.mllib.linalg.DenseVector = [1.0,2.0,3.0,4.0,5.0]
scala> val observations = sc.parallelize(Array(v))
observations: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.DenseVector] = ParallelCollectionRDD[3] at parallelize at <console>:19
scala> val summary: MultivariateStatisticalSummary = Statistics.colStats(observations)
<console>:21: error: type mismatch;
found : org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.DenseVector]
required: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]
Note: org.apache.spark.mllib.linalg.DenseVector <: org.apache.spark.mllib.linalg.Vector, but class RDD is invariant in type T.
You may wish to define T as +T instead. (SLS 4.5)
val summary: MultivariateStatisticalSummary = Statistics.colStats(observations)
问题:
1) 我应该如何将 DenseVector 转换为 Vector?
2) 在实际程序中而不是双精度数组中,我必须使用以下方法获取有关从 RDD 获得的集合的统计信息:
def countByKey(): Map[K, Long]
//Count the number of elements for each key, and return the result to the master as a Map.
所以我必须做:
myRdd.countByKey().values.map(_.toDouble)
这没有多大意义,因为我现在不得不使用常规的 Scala 集合,而不是使用 RDD,而这些集合有时会停止适应内存。 Spark 分布式计算的所有优势都丧失了。
如何以可扩展的方式解决这个问题?
更新
就我而言,我有:
val cnts: org.apache.spark.rdd.RDD[Int] = prodCntByCity.map(_._2) // get product counts only
val doubleCnts: org.apache.spark.rdd.RDD[Double] = cnts.map(_.toDouble)
如何将doubleCnts
转换成observations: RDD[Vector]
?
1) 不需要强制转换,只需要输入:
val observations = sc.parallelize(Array(v: Vector))
2) 使用aggregateByKey
(将所有键映射到1,并通过求和减少)而不是countByKey
。
DenseVector 具有压缩功能。因此您可以将 RDD[DenseVector] 更改为 RDD[Vector] 为:
val st =observations.map(x=>x.compressed)
我正在尝试使用 Spark 汇总统计信息,如下所述:https://spark.apache.org/docs/1.1.0/mllib-statistics.html
根据 Spark 文档:
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.mllib.linalg.DenseVector
val observations: RDD[Vector] = ... // an RDD of Vectors
// Compute column summary statistics.
val summary: MultivariateStatisticalSummary = Statistics.colStats(observations)
我在构建 observations:RDD[Vector]
对象时遇到问题。我试试:
scala> val data:Array[Double] = Array(1, 2, 3, 4, 5)
data: Array[Double] = Array(1.0, 2.0, 3.0, 4.0, 5.0)
scala> val v = new DenseVector(data)
v: org.apache.spark.mllib.linalg.DenseVector = [1.0,2.0,3.0,4.0,5.0]
scala> val observations = sc.parallelize(Array(v))
observations: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.DenseVector] = ParallelCollectionRDD[3] at parallelize at <console>:19
scala> val summary: MultivariateStatisticalSummary = Statistics.colStats(observations)
<console>:21: error: type mismatch;
found : org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.DenseVector]
required: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]
Note: org.apache.spark.mllib.linalg.DenseVector <: org.apache.spark.mllib.linalg.Vector, but class RDD is invariant in type T.
You may wish to define T as +T instead. (SLS 4.5)
val summary: MultivariateStatisticalSummary = Statistics.colStats(observations)
问题:
1) 我应该如何将 DenseVector 转换为 Vector?
2) 在实际程序中而不是双精度数组中,我必须使用以下方法获取有关从 RDD 获得的集合的统计信息:
def countByKey(): Map[K, Long]
//Count the number of elements for each key, and return the result to the master as a Map.
所以我必须做:
myRdd.countByKey().values.map(_.toDouble)
这没有多大意义,因为我现在不得不使用常规的 Scala 集合,而不是使用 RDD,而这些集合有时会停止适应内存。 Spark 分布式计算的所有优势都丧失了。
如何以可扩展的方式解决这个问题?
更新
就我而言,我有:
val cnts: org.apache.spark.rdd.RDD[Int] = prodCntByCity.map(_._2) // get product counts only
val doubleCnts: org.apache.spark.rdd.RDD[Double] = cnts.map(_.toDouble)
如何将doubleCnts
转换成observations: RDD[Vector]
?
1) 不需要强制转换,只需要输入:
val observations = sc.parallelize(Array(v: Vector))
2) 使用aggregateByKey
(将所有键映射到1,并通过求和减少)而不是countByKey
。
DenseVector 具有压缩功能。因此您可以将 RDD[DenseVector] 更改为 RDD[Vector] 为:
val st =observations.map(x=>x.compressed)