每行加入两个不同的 RDDs 在一个 - Scala
Join per line two different RDDs in just one - Scala
我正在 Spark-Scala 中编写 K-means 算法。
我的模型预测每个点在哪个簇中。
数据
-6.59 -44.68
-35.73 39.93
47.54 -52.04
23.78 46.82
....
加载数据
val data = sc.textFile("/home/borja/flink/kmeans/points")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
使用 KMeans
将数据聚类为两个 类
val numClusters = 10
val numIterations = 100
val clusters = KMeans.train(parsedData, numClusters, numIterations)
预测
val prediction = clusters.predict(parsedData)
但是,我需要将结果和点放在同一个文件中,格式如下:
[no title, numberOfCluster (1,2,3,..10), pointX, pointY]:
6 -6.59 -44.68
8 -35.73 39.93
10 47.54 -52.04
7 23.78 46.82
这是 Python 中 this executable 的条目,可以打印非常好的结果。
但我尽了最大努力得到了这个:
(您可以检查第一个数字是否有误:68、384、...)
var i = 0
val c = sc.parallelize(data.collect().map(x => {
val tuple = (i, x)
i += 1
tuple
}))
i = 0
val c2 = sc.parallelize(prediction.collect().map(x => {
val tuple = (i, x)
i += 1
tuple
}))
val result = c.join(c2)
result.take(5)
结果:
res94: Array[(Int, (String, Int))] = Array((68,(17.79 13.69,0)), (384,(-33.47 -4.87,8)), (440,(-4.75 -42.21,1)), (4,(-33.31 -13.11,6)), (324,(-39.04 -16.68,6)))
感谢您的帮助! :)
我没有方便测试的 spark 集群,但像这样的东西应该可以工作:
val result = parsedData.map { v =>
val cluster = clusters.predict(v)
s"$cluster ${v(0)} ${v(1)}"
}
result.saveAsTextFile("/some/output/path")
我正在 Spark-Scala 中编写 K-means 算法。 我的模型预测每个点在哪个簇中。
数据
-6.59 -44.68
-35.73 39.93
47.54 -52.04
23.78 46.82
....
加载数据
val data = sc.textFile("/home/borja/flink/kmeans/points")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
使用 KMeans
将数据聚类为两个 类val numClusters = 10
val numIterations = 100
val clusters = KMeans.train(parsedData, numClusters, numIterations)
预测
val prediction = clusters.predict(parsedData)
但是,我需要将结果和点放在同一个文件中,格式如下:
[no title, numberOfCluster (1,2,3,..10), pointX, pointY]:
6 -6.59 -44.68
8 -35.73 39.93
10 47.54 -52.04
7 23.78 46.82
这是 Python 中 this executable 的条目,可以打印非常好的结果。
但我尽了最大努力得到了这个: (您可以检查第一个数字是否有误:68、384、...)
var i = 0
val c = sc.parallelize(data.collect().map(x => {
val tuple = (i, x)
i += 1
tuple
}))
i = 0
val c2 = sc.parallelize(prediction.collect().map(x => {
val tuple = (i, x)
i += 1
tuple
}))
val result = c.join(c2)
result.take(5)
结果:
res94: Array[(Int, (String, Int))] = Array((68,(17.79 13.69,0)), (384,(-33.47 -4.87,8)), (440,(-4.75 -42.21,1)), (4,(-33.31 -13.11,6)), (324,(-39.04 -16.68,6)))
感谢您的帮助! :)
我没有方便测试的 spark 集群,但像这样的东西应该可以工作:
val result = parsedData.map { v =>
val cluster = clusters.predict(v)
s"$cluster ${v(0)} ${v(1)}"
}
result.saveAsTextFile("/some/output/path")