将 groupByKey() 替换为 reduceByKey()
Replace groupByKey() with reduceByKey()
这是 . I am trying to implement k-means based on this implementation 的后续问题。它工作得很好,但是 我想用 reduceByKey()
替换 groupByKey()
,但我不确定如何(我现在不担心性能)。这是相关的缩小代码:
val data = sc.textFile("dense.txt").map(
t => (t.split("#")(0), parseVector(t.split("#")(1)))).cache()
val read_mean_centroids = sc.textFile("centroids.txt").map(
t => (t.split("#")(0), parseVector(t.split("#")(1))))
var centroids = read_mean_centroids.takeSample(false, K, 42).map(x => x._2)
do {
var closest = read_mean_centroids.map(p => (closestPoint(p._2, centroids), p._2))
var pointsGroup = closest.groupByKey() // <-- THE VICTIM :)
var newCentroids = pointsGroup.mapValues(ps => average(ps.toSeq)).collectAsMap()
..
注意 println(newCentroids)
会给出:
Map(23 -> (-6.269305E-4, -0.0011746404, -4.08004E-5), 8 -> (-5.108732E-4, 7.336348E-4, -3.707591E-4), 17 -> (-0.0016383086, -0.0016974678, 1.45..
和println(closest)
:
MapPartitionsRDD[6] at map at kmeans.scala:75
相关问题:Using reduceByKey in Apache Spark (Scala).
def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]
使用关联归约函数合并每个键的值。
def reduceByKey(func: (V, V) ⇒ V, numPartitions: Int): RDD[(K, V)]
使用关联归约函数合并每个键的值。
def reduceByKey(partitioner: Partitioner, func: (V, V) ⇒ V): RDD[(K, V)]
使用关联归约函数合并每个键的值。
def groupByKey(): RDD[(K, Iterable[V])]
将 RDD 中每个键的值分组为一个序列。
您可以像这样使用 aggregateByKey()
(比 reduceByKey()
更自然一点)来计算 newCentroids
:
val newCentroids = closest.aggregateByKey((Vector.zeros(dim), 0L))(
(agg, v) => (agg._1 += v, agg._2 + 1L),
(agg1, agg2) => (agg1._1 += agg2._1, agg1._2 + agg2._2)
).mapValues(agg => agg._1/agg._2).collectAsMap
为此,您需要计算数据的维度,即 dim
,但您只需执行一次。你可能会使用像 val dim = data.first._2.length
.
这样的东西
这是 reduceByKey()
替换 groupByKey()
,但我不确定如何(我现在不担心性能)。这是相关的缩小代码:
val data = sc.textFile("dense.txt").map(
t => (t.split("#")(0), parseVector(t.split("#")(1)))).cache()
val read_mean_centroids = sc.textFile("centroids.txt").map(
t => (t.split("#")(0), parseVector(t.split("#")(1))))
var centroids = read_mean_centroids.takeSample(false, K, 42).map(x => x._2)
do {
var closest = read_mean_centroids.map(p => (closestPoint(p._2, centroids), p._2))
var pointsGroup = closest.groupByKey() // <-- THE VICTIM :)
var newCentroids = pointsGroup.mapValues(ps => average(ps.toSeq)).collectAsMap()
..
注意 println(newCentroids)
会给出:
Map(23 -> (-6.269305E-4, -0.0011746404, -4.08004E-5), 8 -> (-5.108732E-4, 7.336348E-4, -3.707591E-4), 17 -> (-0.0016383086, -0.0016974678, 1.45..
和println(closest)
:
MapPartitionsRDD[6] at map at kmeans.scala:75
相关问题:Using reduceByKey in Apache Spark (Scala).
def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]
使用关联归约函数合并每个键的值。
def reduceByKey(func: (V, V) ⇒ V, numPartitions: Int): RDD[(K, V)]
使用关联归约函数合并每个键的值。
def reduceByKey(partitioner: Partitioner, func: (V, V) ⇒ V): RDD[(K, V)]
使用关联归约函数合并每个键的值。
def groupByKey(): RDD[(K, Iterable[V])]
将 RDD 中每个键的值分组为一个序列。
您可以像这样使用 aggregateByKey()
(比 reduceByKey()
更自然一点)来计算 newCentroids
:
val newCentroids = closest.aggregateByKey((Vector.zeros(dim), 0L))(
(agg, v) => (agg._1 += v, agg._2 + 1L),
(agg1, agg2) => (agg1._1 += agg2._1, agg1._2 + agg2._2)
).mapValues(agg => agg._1/agg._2).collectAsMap
为此,您需要计算数据的维度,即 dim
,但您只需执行一次。你可能会使用像 val dim = data.first._2.length
.