如何从 Scala 中的 DataFrame 在 Spark 中创建分布式稀疏矩阵
How to create a distributed sparse matrix in Spark from DataFrame in Scala
问题
请帮助找到从 DataFrame 中的(用户、特征、值)记录创建分布式矩阵的方法,其中特征及其值存储在列中。
数据摘录如下,但用户和功能较多,并未针对用户测试所有功能。因此,许多特征值是空的,被归为 0。
例如,验血可能具有 糖水平、胆固醇水平 等特征。如果这些级别不可接受,则将 1 设置为该值。但并非所有功能都会针对用户(或患者)进行测试。
+----+-------+-----+
|user|feature|value|
+----+-------+-----+
| 14| 0| 1|
| 14| 222| 1|
| 14| 200| 1|
| 22| 0| 1|
| 22| 32| 1|
| 22| 147| 1|
| 22| 279| 1|
| 22| 330| 1|
| 22| 363| 1|
| 22| 162| 1|
| 22| 811| 1|
| 22| 290| 1|
| 22| 335| 1|
| 22| 681| 1|
| 22| 786| 1|
| 22| 789| 1|
| 22| 842| 1|
| 22| 856| 1|
| 22| 881| 1|
+----+-------+-----+
如果特征已经是列,那么有一些方法可以解释。
但事实并非如此。因此,一种方法可能是旋转数据框以应用这些方法。
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|user| 0| 32|147|162|200|222|279|290|330|335|363|681|786|789|811|842|856|881|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
| 14| 1| 0| 0| 0| 1| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|
| 22| 1| 1| 1| 1| 0| 0| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
然后使用行到向量的转换。我想使用其中之一:
- 矢量汇编器
- org.apache.spark.mllib.linalg.Vectors.fromML
- org.apache.spark.mllib.linalg.distributed.MatrixEntry
但是,由于将有许多空值归因于 0,因此旋转数据帧将消耗更多内存 space。此外,旋转分布在多个节点之间的大型数据帧会导致大量改组。
因此,寻求意见、想法、建议。
相关
- How to convert a DataFrame to a Vector.dense in scala
- VectorAssembler
- Scalable Sparse Matrix Multiplication in Apache Spark
- Spark MLlib Data Types | Apache Spark Machine Learning
- Linear Algebra and Distributed Machine Learning in Scala using Breeze and MLlib
环境
Spark 2.4.4
也许您可以将每一行转换为 json 表示,例如:
{
"user": 14
"features" : [
{
"feature" : 0
"value" : 1
},
{
"feature" : 222
"value" : 1
}
]
}
但这一切都取决于您以后如何使用 "distributed matrix"。
解决方案
- 为每个输入行创建一个 RDD[(user, feature)]。
- groupByKey 创建一个RDD[(user, [feature+])].
- 创建一个 RDD[IndexedRow],其中每个 IndexedRow 代表下面所有现有的特征。
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|user| 0| 32|147|162|200|222|279|290|330|335|363|681|786|789|811|842|856|881|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
| 14| 1| 0| 0| 0| 1| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|
- 将 RDD[IndexedRow] 转换为 IndexedRowMatrix。
乘积运算,将RowIndexedMatrix转换为支持分布式乘积运算的BlockMatrix
将每条原始记录转换为 IndexedRow
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.distributed._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
def toIndexedRow(userToFeaturesMap:(Int, Iterable[Int]), maxFeatureId: Int): IndexedRow = {
userToFeaturesMap match {
case (userId, featureIDs) => {
val featureCountKV = featureIDs.map(i => (i, 1.0)).toSeq
new IndexedRow (
userId,
Vectors.sparse(maxFeatureId + 1, featureCountKV)
)
}
}
}
val userToFeatureCounters= featureData.rdd
.map(rowPF => (rowPF.getInt(0), rowPF.getInt(1))) // Out from ROW[(userId, featureId)]
.groupByKey() // (userId, Iterable(featureId))
.map(
userToFeatureIDsMap => toIndexedRow(userToFeatureIDsMap, maxFeatureId)
) // IndexedRow(userId, Vector((featureId, 1)))
已创建 IndexedRowMatrix
val userFeatureIndexedMatrix = new IndexedRowMatrix(userToFeatureCounters)
通过 BlockMatrix 转置 IndexedRowMatrix,因为 IndexedRowMatrix 不支持转置
val userFeatureBlockMatrixTransposed = userFeatureBlockMatrix
.transpose
使用 BlockMatrix 作为 IndexedRowMatrix 创建的产品需要右侧的 Local DenseMatrix。
val featuresTogetherIndexedMatrix = userFeatureBlockMatrix
.multiply(userFeatureBlockMatrixTransposed)
.toIndexedRowMatrix
问题
请帮助找到从 DataFrame 中的(用户、特征、值)记录创建分布式矩阵的方法,其中特征及其值存储在列中。
数据摘录如下,但用户和功能较多,并未针对用户测试所有功能。因此,许多特征值是空的,被归为 0。
例如,验血可能具有 糖水平、胆固醇水平 等特征。如果这些级别不可接受,则将 1 设置为该值。但并非所有功能都会针对用户(或患者)进行测试。
+----+-------+-----+
|user|feature|value|
+----+-------+-----+
| 14| 0| 1|
| 14| 222| 1|
| 14| 200| 1|
| 22| 0| 1|
| 22| 32| 1|
| 22| 147| 1|
| 22| 279| 1|
| 22| 330| 1|
| 22| 363| 1|
| 22| 162| 1|
| 22| 811| 1|
| 22| 290| 1|
| 22| 335| 1|
| 22| 681| 1|
| 22| 786| 1|
| 22| 789| 1|
| 22| 842| 1|
| 22| 856| 1|
| 22| 881| 1|
+----+-------+-----+
如果特征已经是列,那么有一些方法可以解释。
但事实并非如此。因此,一种方法可能是旋转数据框以应用这些方法。
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|user| 0| 32|147|162|200|222|279|290|330|335|363|681|786|789|811|842|856|881|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
| 14| 1| 0| 0| 0| 1| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|
| 22| 1| 1| 1| 1| 0| 0| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
然后使用行到向量的转换。我想使用其中之一:
- 矢量汇编器
- org.apache.spark.mllib.linalg.Vectors.fromML
- org.apache.spark.mllib.linalg.distributed.MatrixEntry
但是,由于将有许多空值归因于 0,因此旋转数据帧将消耗更多内存 space。此外,旋转分布在多个节点之间的大型数据帧会导致大量改组。
因此,寻求意见、想法、建议。
相关
- How to convert a DataFrame to a Vector.dense in scala
- VectorAssembler
- Scalable Sparse Matrix Multiplication in Apache Spark
- Spark MLlib Data Types | Apache Spark Machine Learning
- Linear Algebra and Distributed Machine Learning in Scala using Breeze and MLlib
环境
Spark 2.4.4
也许您可以将每一行转换为 json 表示,例如:
{
"user": 14
"features" : [
{
"feature" : 0
"value" : 1
},
{
"feature" : 222
"value" : 1
}
]
}
但这一切都取决于您以后如何使用 "distributed matrix"。
解决方案
- 为每个输入行创建一个 RDD[(user, feature)]。
- groupByKey 创建一个RDD[(user, [feature+])].
- 创建一个 RDD[IndexedRow],其中每个 IndexedRow 代表下面所有现有的特征。
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|user| 0| 32|147|162|200|222|279|290|330|335|363|681|786|789|811|842|856|881|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
| 14| 1| 0| 0| 0| 1| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|
- 将 RDD[IndexedRow] 转换为 IndexedRowMatrix。
乘积运算,将RowIndexedMatrix转换为支持分布式乘积运算的BlockMatrix
将每条原始记录转换为 IndexedRow
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.distributed._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
def toIndexedRow(userToFeaturesMap:(Int, Iterable[Int]), maxFeatureId: Int): IndexedRow = {
userToFeaturesMap match {
case (userId, featureIDs) => {
val featureCountKV = featureIDs.map(i => (i, 1.0)).toSeq
new IndexedRow (
userId,
Vectors.sparse(maxFeatureId + 1, featureCountKV)
)
}
}
}
val userToFeatureCounters= featureData.rdd
.map(rowPF => (rowPF.getInt(0), rowPF.getInt(1))) // Out from ROW[(userId, featureId)]
.groupByKey() // (userId, Iterable(featureId))
.map(
userToFeatureIDsMap => toIndexedRow(userToFeatureIDsMap, maxFeatureId)
) // IndexedRow(userId, Vector((featureId, 1)))
已创建 IndexedRowMatrix
val userFeatureIndexedMatrix = new IndexedRowMatrix(userToFeatureCounters)
通过 BlockMatrix 转置 IndexedRowMatrix,因为 IndexedRowMatrix 不支持转置
val userFeatureBlockMatrixTransposed = userFeatureBlockMatrix
.transpose
使用 BlockMatrix 作为 IndexedRowMatrix 创建的产品需要右侧的 Local DenseMatrix。
val featuresTogetherIndexedMatrix = userFeatureBlockMatrix
.multiply(userFeatureBlockMatrixTransposed)
.toIndexedRowMatrix