Spark MLLib Kmeans 来自数据帧,然后再次返回
Spark MLLib Kmeans from dataframe, and back again
我的目标是使用 Spark (1.3.1) MLLib 将 kmeans 聚类算法应用于非常大的数据集。我已经使用 Spark 的 hiveContext 从 HDFS 调用了数据,并最终想以这种方式将其放回原处 - 以这种格式
|I.D |cluster |
===================
|546 |2 |
|6534 |4 |
|236 |5 |
|875 |2 |
我有 运行 以下代码,其中 "data" 是一个双精度数据帧,以及第一列的 ID。
val parsedData = data.rdd.map(s => Vectors.dense(s.getDouble(1),s.getDouble(2))).cache()
val clusters = KMeans.train(parsedData, 3, 20)
运行成功,我现在无法将集群映射回它们各自的 ID,在数据框中,如上所述。我可以将其转换为数据帧:
sc.makeRDD(clusters.predict(parsedData).toArray()).toDF()
但这就是我所知道的。 This post is on the right track, and 我想我问的是一个类似的问题。
我怀疑需要 labeledPoint 库。任何意见,答案将不胜感激,干杯。
编辑:刚刚在 Spark 用户列表中找到 this,看起来很有希望
我正在使用 pySpark 做类似的事情。我猜您可以直接将其转换为 Scala,因为没有 python 特定内容。 myPointsWithID 是我的 RDD,每个点都有一个 ID,该点表示为一个值数组。
# Get an RDD of only the vectors representing the points to be clustered
points = myPointsWithID.map(lambda (id, point): point)
clusters = KMeans.train(points,
100,
maxIterations=100,
runs=50,
initializationMode='random')
# For each point in the original RDD, replace the point with the
# ID of the cluster the point belongs to.
clustersBC = sc.broadcast(clusters)
pointClusters = myPointsWithID.map(lambda (id, point): (id, clustersBC.value.predict(point)))
我了解到您想在最后获取DataFrame。我看到两种可能的解决方案。我会说在它们之间进行选择是品味问题。
从 RDD 创建列
以RDD形式获取ids和簇对非常容易:
val idPointRDD = data.rdd.map(s => (s.getInt(0), Vectors.dense(s.getDouble(1),s.getDouble(2)))).cache()
val clusters = KMeans.train(idPointRDD.map(_._2), 3, 20)
val clustersRDD = clusters.predict(idPointRDD.map(_._2))
val idClusterRDD = idPointRDD.map(_._1).zip(clustersRDD)
然后从中创建 DataFrame
val idCluster = idClusterRDD.toDF("id", "cluster")
之所以有效,是因为 map 不会更改 RDD 中数据的顺序,这就是为什么您可以将 id 与预测结果一起压缩的原因。
使用 UDF(用户定义函数)
第二种方法涉及使用 clusters.predict
方法作为 UDF:
val bcClusters = sc.broadcast(clusters)
def predict(x: Double, y: Double): Int = {
bcClusters.value.predict(Vectors.dense(x, y))
}
sqlContext.udf.register("predict", predict _)
现在我们可以使用它向数据添加预测:
val idCluster = data.selectExpr("id", "predict(x, y) as cluster")
请记住,Spark API 不允许 UDF 注销。这意味着闭包数据将保存在内存中。
错误/非最佳解决方案
使用clusters.predict不广播
它在分布式设置中不起作用。编辑:实际上它会起作用,我被使用广播的implementation of predict for RDD弄糊涂了。
sc.makeRDD(clusters.predict(parsedData).toArray()).toDF()
toArray
收集驱动程序中的所有数据。这意味着在分布式模式下,您会将集群 ID 复制到一个节点中。
让我知道此代码是否适合您:
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.clustering._
val rows = data.rdd.map(r => (r.getDouble(1),r.getDouble(2))).cache()
val vectors = rows.map(r => Vectors.dense(r._1, r._2))
val kMeansModel = KMeans.train(vectors, 3, 20)
val predictions = rows.map{r => (r._1, kMeansModel.predict(Vectors.dense(r._1, r._2)))}
val df = predictions.toDF("id", "cluster")
df.show
根据你的代码,我假设:
data
是一个包含三列的 DataFrame(label: Double
、x1: Double
和 x2: Double
)
- 您希望
KMeans.predict
使用 x1
和 x2
来进行集群分配 closestCluster: Int
- 结果数据框的格式应为 (
label: Double
, closestCluster: Int
)
这是一个简单的示例应用程序,其中包含一些遵循假定模式的玩具数据:
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.sql.functions.{col, udf}
case class DataRow(label: Double, x1: Double, x2: Double)
val data = sqlContext.createDataFrame(sc.parallelize(Seq(
DataRow(3, 1, 2),
DataRow(5, 3, 4),
DataRow(7, 5, 6),
DataRow(6, 0, 0)
)))
val parsedData = data.rdd.map(s => Vectors.dense(s.getDouble(1),s.getDouble(2))).cache()
val clusters = KMeans.train(parsedData, 3, 20)
val t = udf { (x1: Double, x2: Double) => clusters.predict(Vectors.dense(x1, x2)) }
val result = data.select(col("label"), t(col("x1"), col("x2")))
重要的部分是最后两行。
创建一个UDF(用户定义的函数),它可以直接应用于Dataframe列(在本例中,两列x1
和 x2
).
选择 label
列以及应用于 x1
和 x2
列的 UDF。由于 UDF 将预测 closestCluster
,在此之后 result
将是一个由 (label
, closestCluster
)
组成的 Dataframe
我的目标是使用 Spark (1.3.1) MLLib 将 kmeans 聚类算法应用于非常大的数据集。我已经使用 Spark 的 hiveContext 从 HDFS 调用了数据,并最终想以这种方式将其放回原处 - 以这种格式
|I.D |cluster |
===================
|546 |2 |
|6534 |4 |
|236 |5 |
|875 |2 |
我有 运行 以下代码,其中 "data" 是一个双精度数据帧,以及第一列的 ID。
val parsedData = data.rdd.map(s => Vectors.dense(s.getDouble(1),s.getDouble(2))).cache()
val clusters = KMeans.train(parsedData, 3, 20)
运行成功,我现在无法将集群映射回它们各自的 ID,在数据框中,如上所述。我可以将其转换为数据帧:
sc.makeRDD(clusters.predict(parsedData).toArray()).toDF()
但这就是我所知道的。 This post is on the right track, and
我怀疑需要 labeledPoint 库。任何意见,答案将不胜感激,干杯。
编辑:刚刚在 Spark 用户列表中找到 this,看起来很有希望
我正在使用 pySpark 做类似的事情。我猜您可以直接将其转换为 Scala,因为没有 python 特定内容。 myPointsWithID 是我的 RDD,每个点都有一个 ID,该点表示为一个值数组。
# Get an RDD of only the vectors representing the points to be clustered
points = myPointsWithID.map(lambda (id, point): point)
clusters = KMeans.train(points,
100,
maxIterations=100,
runs=50,
initializationMode='random')
# For each point in the original RDD, replace the point with the
# ID of the cluster the point belongs to.
clustersBC = sc.broadcast(clusters)
pointClusters = myPointsWithID.map(lambda (id, point): (id, clustersBC.value.predict(point)))
我了解到您想在最后获取DataFrame。我看到两种可能的解决方案。我会说在它们之间进行选择是品味问题。
从 RDD 创建列
以RDD形式获取ids和簇对非常容易:
val idPointRDD = data.rdd.map(s => (s.getInt(0), Vectors.dense(s.getDouble(1),s.getDouble(2)))).cache()
val clusters = KMeans.train(idPointRDD.map(_._2), 3, 20)
val clustersRDD = clusters.predict(idPointRDD.map(_._2))
val idClusterRDD = idPointRDD.map(_._1).zip(clustersRDD)
然后从中创建 DataFrame
val idCluster = idClusterRDD.toDF("id", "cluster")
之所以有效,是因为 map 不会更改 RDD 中数据的顺序,这就是为什么您可以将 id 与预测结果一起压缩的原因。
使用 UDF(用户定义函数)
第二种方法涉及使用 clusters.predict
方法作为 UDF:
val bcClusters = sc.broadcast(clusters)
def predict(x: Double, y: Double): Int = {
bcClusters.value.predict(Vectors.dense(x, y))
}
sqlContext.udf.register("predict", predict _)
现在我们可以使用它向数据添加预测:
val idCluster = data.selectExpr("id", "predict(x, y) as cluster")
请记住,Spark API 不允许 UDF 注销。这意味着闭包数据将保存在内存中。
错误/非最佳解决方案
使用clusters.predict不广播
它在分布式设置中不起作用。编辑:实际上它会起作用,我被使用广播的implementation of predict for RDD弄糊涂了。
sc.makeRDD(clusters.predict(parsedData).toArray()).toDF()
toArray
收集驱动程序中的所有数据。这意味着在分布式模式下,您会将集群 ID 复制到一个节点中。
让我知道此代码是否适合您:
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.clustering._
val rows = data.rdd.map(r => (r.getDouble(1),r.getDouble(2))).cache()
val vectors = rows.map(r => Vectors.dense(r._1, r._2))
val kMeansModel = KMeans.train(vectors, 3, 20)
val predictions = rows.map{r => (r._1, kMeansModel.predict(Vectors.dense(r._1, r._2)))}
val df = predictions.toDF("id", "cluster")
df.show
根据你的代码,我假设:
data
是一个包含三列的 DataFrame(label: Double
、x1: Double
和x2: Double
)- 您希望
KMeans.predict
使用x1
和x2
来进行集群分配closestCluster: Int
- 结果数据框的格式应为 (
label: Double
,closestCluster: Int
)
这是一个简单的示例应用程序,其中包含一些遵循假定模式的玩具数据:
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.sql.functions.{col, udf}
case class DataRow(label: Double, x1: Double, x2: Double)
val data = sqlContext.createDataFrame(sc.parallelize(Seq(
DataRow(3, 1, 2),
DataRow(5, 3, 4),
DataRow(7, 5, 6),
DataRow(6, 0, 0)
)))
val parsedData = data.rdd.map(s => Vectors.dense(s.getDouble(1),s.getDouble(2))).cache()
val clusters = KMeans.train(parsedData, 3, 20)
val t = udf { (x1: Double, x2: Double) => clusters.predict(Vectors.dense(x1, x2)) }
val result = data.select(col("label"), t(col("x1"), col("x2")))
重要的部分是最后两行。
创建一个UDF(用户定义的函数),它可以直接应用于Dataframe列(在本例中,两列
x1
和x2
).选择
label
列以及应用于x1
和x2
列的 UDF。由于 UDF 将预测closestCluster
,在此之后result
将是一个由 (label
,closestCluster
) 组成的 Dataframe