如何将数据从 DataFrame 准备为 LibSVM 格式?
How to prepare data into a LibSVM format from DataFrame?
我想做成libsvm格式,所以我把dataframe做成了我想要的格式,但是我不知道怎么转换成libsvm格式。格式如图所示。我希望所需的 libsvm 类型是 user item:rating 。如果你知道在当前情况下该怎么做:
val ratings = sc.textFile(new File("/user/ubuntu/kang/0829/rawRatings.csv").toString).map { line =>
val fields = line.split(",")
(fields(0).toInt,fields(1).toInt,fields(2).toDouble)
}
val user = ratings.map{ case (user,product,rate) => (user,(product.toInt,rate.toDouble))}
val usergroup = user.groupByKey
val data =usergroup.map{ case(x,iter) => (x,iter.map(_._1).toArray,iter.map(_._2).toArray)}
val data_DF = data.toDF("user","item","rating")
我正在使用 Spark 2.0。
您遇到的问题可以分为以下几种:
- 将您的评分(我相信)转换为
LabeledPoint
数据 X。
- 以 libsvm 格式保存 X。
1.将您的评分转换为 LabeledPoint
数据 X
让我们考虑以下原始评分:
val rawRatings: Seq[String] = Seq("0,1,1.0", "0,3,3.0", "1,1,1.0", "1,2,0.0", "1,3,3.0", "3,3,4.0", "10,3,4.5")
您可以将这些原始评分处理为 coordinate list matrix (COO)。
Spark 实现了一个由其条目的 RDD 支持的分布式矩阵:CoordinateMatrix
其中每个条目都是 (i: Long, j: Long, value: Double) 的元组。
注意:仅当矩阵的两个维度都很大且矩阵非常稀疏时才应使用 CoordinateMatrix。(这通常是 user/item 评级的情况。)
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
import org.apache.spark.rdd.RDD
val data: RDD[MatrixEntry] =
sc.parallelize(rawRatings).map {
line => {
val fields = line.split(",")
val i = fields(0).toLong
val j = fields(1).toLong
val value = fields(2).toDouble
MatrixEntry(i, j, value)
}
}
现在让我们将 RDD[MatrixEntry]
转换为 CoordinateMatrix
并提取索引行:
val df = new CoordinateMatrix(data) // Convert the RDD to a CoordinateMatrix
.toIndexedRowMatrix().rows // Extract indexed rows
.toDF("label", "features") // Convert rows
2。以 libsvm 格式
保存 LabeledPoint 数据
从 Spark 2.0 开始,您可以使用 DataFrameWriter
来做到这一点。让我们创建一个带有一些虚拟 LabeledPoint 数据的小示例(您也可以使用我们之前创建的 DataFrame
):
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
val df = Seq(neg,pos).toDF("label","features")
不幸的是,我们仍然不能直接使用 DataFrameWriter
,因为虽然大多数管道组件支持加载的向后兼容性,但 2.0 之前的 Spark 版本中的一些现有 DataFrames 和管道,包含向量或矩阵列,可能需要迁移到新的 spark.ml 向量和矩阵类型。
可以在 org.apache.spark.mllib.util.MLUtils.
中找到将 DataFrame 列从 mllib.linalg
类型转换为 ml.linalg
类型(反之亦然)的实用程序。在我们的例子中,我们需要执行以下操作(对于虚拟数据和来自 step 1.
)
的 DataFrame
import org.apache.spark.mllib.util.MLUtils
// convert DataFrame columns
val convertedVecDF = MLUtils.convertVectorColumnsToML(df)
现在让我们保存 DataFrame :
convertedVecDF.write.format("libsvm").save("data/foo")
我们可以检查文件内容:
$ cat data/foo/part*
0.0 1:1.0 3:3.0
1.0 1:1.0 2:0.0 3:3.0
编辑:
在当前版本的 spark (2.1.0) 中,不需要使用 mllib
包。您可以简单地将 LabeledPoint
数据保存为 libsvm 格式,如下所示:
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.feature.LabeledPoint
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
val df = Seq(neg,pos).toDF("label","features")
df.write.format("libsvm").save("data/foo")
为了将现有的转换为类型化的DataSet
,我建议如下;使用以下情况 class:
case class LibSvmEntry (
value: Double,
features: L.Vector)
您可以使用 map
函数将其转换为 LibSVM 条目,如下所示:
df.map[LibSvmEntry](r: Row => /* Do your stuff here*/)
libsvm数据类型特征是一个稀疏向量,你可以用pyspark.ml.linalg.SparseVector解决问题
a = SparseVector(4, [1, 3], [3.0, 4.0])
def sparsevecfuc(len,index,score):
"""
args: len int, index array, score array
"""
return SparseVector(len,index,score)
trans_sparse = udf(sparsevecfuc,VectorUDT())
我想做成libsvm格式,所以我把dataframe做成了我想要的格式,但是我不知道怎么转换成libsvm格式。格式如图所示。我希望所需的 libsvm 类型是 user item:rating 。如果你知道在当前情况下该怎么做:
val ratings = sc.textFile(new File("/user/ubuntu/kang/0829/rawRatings.csv").toString).map { line =>
val fields = line.split(",")
(fields(0).toInt,fields(1).toInt,fields(2).toDouble)
}
val user = ratings.map{ case (user,product,rate) => (user,(product.toInt,rate.toDouble))}
val usergroup = user.groupByKey
val data =usergroup.map{ case(x,iter) => (x,iter.map(_._1).toArray,iter.map(_._2).toArray)}
val data_DF = data.toDF("user","item","rating")
我正在使用 Spark 2.0。
您遇到的问题可以分为以下几种:
- 将您的评分(我相信)转换为
LabeledPoint
数据 X。 - 以 libsvm 格式保存 X。
1.将您的评分转换为 LabeledPoint
数据 X
让我们考虑以下原始评分:
val rawRatings: Seq[String] = Seq("0,1,1.0", "0,3,3.0", "1,1,1.0", "1,2,0.0", "1,3,3.0", "3,3,4.0", "10,3,4.5")
您可以将这些原始评分处理为 coordinate list matrix (COO)。
Spark 实现了一个由其条目的 RDD 支持的分布式矩阵:CoordinateMatrix
其中每个条目都是 (i: Long, j: Long, value: Double) 的元组。
注意:仅当矩阵的两个维度都很大且矩阵非常稀疏时才应使用 CoordinateMatrix。(这通常是 user/item 评级的情况。)
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
import org.apache.spark.rdd.RDD
val data: RDD[MatrixEntry] =
sc.parallelize(rawRatings).map {
line => {
val fields = line.split(",")
val i = fields(0).toLong
val j = fields(1).toLong
val value = fields(2).toDouble
MatrixEntry(i, j, value)
}
}
现在让我们将 RDD[MatrixEntry]
转换为 CoordinateMatrix
并提取索引行:
val df = new CoordinateMatrix(data) // Convert the RDD to a CoordinateMatrix
.toIndexedRowMatrix().rows // Extract indexed rows
.toDF("label", "features") // Convert rows
2。以 libsvm 格式
保存 LabeledPoint 数据从 Spark 2.0 开始,您可以使用 DataFrameWriter
来做到这一点。让我们创建一个带有一些虚拟 LabeledPoint 数据的小示例(您也可以使用我们之前创建的 DataFrame
):
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
val df = Seq(neg,pos).toDF("label","features")
不幸的是,我们仍然不能直接使用 DataFrameWriter
,因为虽然大多数管道组件支持加载的向后兼容性,但 2.0 之前的 Spark 版本中的一些现有 DataFrames 和管道,包含向量或矩阵列,可能需要迁移到新的 spark.ml 向量和矩阵类型。
可以在 org.apache.spark.mllib.util.MLUtils.
中找到将 DataFrame 列从 mllib.linalg
类型转换为 ml.linalg
类型(反之亦然)的实用程序。在我们的例子中,我们需要执行以下操作(对于虚拟数据和来自 step 1.
)
DataFrame
import org.apache.spark.mllib.util.MLUtils
// convert DataFrame columns
val convertedVecDF = MLUtils.convertVectorColumnsToML(df)
现在让我们保存 DataFrame :
convertedVecDF.write.format("libsvm").save("data/foo")
我们可以检查文件内容:
$ cat data/foo/part*
0.0 1:1.0 3:3.0
1.0 1:1.0 2:0.0 3:3.0
编辑:
在当前版本的 spark (2.1.0) 中,不需要使用 mllib
包。您可以简单地将 LabeledPoint
数据保存为 libsvm 格式,如下所示:
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.feature.LabeledPoint
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
val df = Seq(neg,pos).toDF("label","features")
df.write.format("libsvm").save("data/foo")
为了将现有的转换为类型化的DataSet
,我建议如下;使用以下情况 class:
case class LibSvmEntry (
value: Double,
features: L.Vector)
您可以使用 map
函数将其转换为 LibSVM 条目,如下所示:
df.map[LibSvmEntry](r: Row => /* Do your stuff here*/)
libsvm数据类型特征是一个稀疏向量,你可以用pyspark.ml.linalg.SparseVector解决问题
a = SparseVector(4, [1, 3], [3.0, 4.0])
def sparsevecfuc(len,index,score):
"""
args: len int, index array, score array
"""
return SparseVector(len,index,score)
trans_sparse = udf(sparsevecfuc,VectorUDT())