Scala spark:如何训练分布式稀疏回归模型?
Scala spark: How to train a distributed sparse regression model?
我正在尝试构建一个回归模型,其中基础特征矩阵非常大(73K 列上有 418K 行)并且非常稀疏(58M 非零值,大约占整个矩阵的 0.2%)。
我将矩阵坐标表示为 DataFrame,其中第一列是行坐标 i
,第二列是列坐标 j
,第三列是 {i,j}
中的值第 th 个位置。
例如以下矩阵:
+-+-+-+
|0|1|0|
|2|0|0|
|0|0|3|
+-+-+-+
表示为
+-+-+-----+
|i|j|value|
+-+-+-----+
|0|1| 1 |
|1|0| 2 |
|2|2| 3 |
+-+-+-----+
我有一个单独的 DataFrame,其中包含每一行的标签 i
。
如果可能的话,我希望解决方案使用较新的 ml
库而不是较旧的 mllib
下面我给出一个小代码示例,说明如何在spark ml
中实现分布式稀疏线性回归。我已经在大型集群(Databricks Runtime 版本 6.5 ML - 包括 Apache Spark 2.4.5、Scala 2.11)上将它与相关矩阵一起使用,因此它可以很好地扩展并且只需几分钟即可执行。
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.Dataset
import org.apache.spark.ml.linalg.SparseVector
import org.apache.spark.ml.feature.LabeledPoint
import spark.implicits._
import org.apache.spark.ml.regression.LinearRegression
// Construct Matrix coordinate representation DataFrame
val df = Seq(
(0, 1, 14.0),
(0, 0, 13.0),
(1, 1, 11.0)
).toDF("i", "j", "value")
df.show()
+---+---+-----+
| i| j|value|
+---+---+-----+
| 0| 1| 14.0|
| 0| 0| 13.0|
| 1| 1| 11.0|
+---+---+-----+
// Construct label DataFrame
val df_label = Seq(
(0, 41.1),
(1, 21.9) // beta_1 = 1, beta_2 = 2
).toDF("i", "label")
df_label.show()
+---+-----+
| i|label|
+---+-----+
| 0| 41.1|
| 1| 21.9|
+---+-----+
// Use a UDF to sort arrays below
val sortUdf: UserDefinedFunction = udf((rows: Seq[Row]) => {
rows.map { case Row(j: Int, value: Double) => (j, value) }
.sortBy { case (j, value) => j }
})
// collect j and value columns to lists, make sure they are sorted by j
// then join with labels
val df_collected_with_labels = df
.groupBy("i")
.agg(collect_list(struct("j", "value")) as "j_value")
.select($"i", sortUdf(col("j_value")).alias("j_value_list"))
.withColumn("j_list", $"j_value_list".getField("_1"))
.withColumn("value_list", $"j_value_list".getField("_2"))
.drop("j_value_list")
.join(df_label, "i")
df_collected_with_labels.show()
+---+------+------------+-----+
| i|j_list| value_list|label|
+---+------+------------+-----+
| 1| [1]| [11.0]| 21.9|
| 0|[0, 1]|[13.0, 14.0]| 41.1|
+---+------+------------+-----+
val unique_j = df.dropDuplicates("j").count().toInt
val sparse_df = df_collected_with_labels
.map(r => LabeledPoint(r.getDouble(3),
new SparseVector(size = unique_j,
indices = r.getAs[Seq[Int]]("j_list").toArray,
values = r.getAs[Seq[Double]]("value_list").toArray)))
sparse_df.show()
+-----+--------------------+
|label| features|
+-----+--------------------+
| 21.9| (2,[1],[11.0])|
| 41.1|(2,[0,1],[13.0,14...|
+-----+--------------------+
// Fit sparse regression!
val lr = new LinearRegression()
.setFitIntercept(false)
val lrModel = lr.fit(sparse_df)
lrModel.coefficients
org.apache.spark.ml.linalg.Vector = [1.0174825174825193,1.9909090909090894]
lrModel.predict(new SparseVector(size = unique_j, indices = Array(0), values = Array(4.0)))
Double = 4.069930069930077
我正在尝试构建一个回归模型,其中基础特征矩阵非常大(73K 列上有 418K 行)并且非常稀疏(58M 非零值,大约占整个矩阵的 0.2%)。
我将矩阵坐标表示为 DataFrame,其中第一列是行坐标 i
,第二列是列坐标 j
,第三列是 {i,j}
中的值第 th 个位置。
例如以下矩阵:
+-+-+-+
|0|1|0|
|2|0|0|
|0|0|3|
+-+-+-+
表示为
+-+-+-----+
|i|j|value|
+-+-+-----+
|0|1| 1 |
|1|0| 2 |
|2|2| 3 |
+-+-+-----+
我有一个单独的 DataFrame,其中包含每一行的标签 i
。
如果可能的话,我希望解决方案使用较新的 ml
库而不是较旧的 mllib
下面我给出一个小代码示例,说明如何在spark ml
中实现分布式稀疏线性回归。我已经在大型集群(Databricks Runtime 版本 6.5 ML - 包括 Apache Spark 2.4.5、Scala 2.11)上将它与相关矩阵一起使用,因此它可以很好地扩展并且只需几分钟即可执行。
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.Dataset
import org.apache.spark.ml.linalg.SparseVector
import org.apache.spark.ml.feature.LabeledPoint
import spark.implicits._
import org.apache.spark.ml.regression.LinearRegression
// Construct Matrix coordinate representation DataFrame
val df = Seq(
(0, 1, 14.0),
(0, 0, 13.0),
(1, 1, 11.0)
).toDF("i", "j", "value")
df.show()
+---+---+-----+
| i| j|value|
+---+---+-----+
| 0| 1| 14.0|
| 0| 0| 13.0|
| 1| 1| 11.0|
+---+---+-----+
// Construct label DataFrame
val df_label = Seq(
(0, 41.1),
(1, 21.9) // beta_1 = 1, beta_2 = 2
).toDF("i", "label")
df_label.show()
+---+-----+
| i|label|
+---+-----+
| 0| 41.1|
| 1| 21.9|
+---+-----+
// Use a UDF to sort arrays below
val sortUdf: UserDefinedFunction = udf((rows: Seq[Row]) => {
rows.map { case Row(j: Int, value: Double) => (j, value) }
.sortBy { case (j, value) => j }
})
// collect j and value columns to lists, make sure they are sorted by j
// then join with labels
val df_collected_with_labels = df
.groupBy("i")
.agg(collect_list(struct("j", "value")) as "j_value")
.select($"i", sortUdf(col("j_value")).alias("j_value_list"))
.withColumn("j_list", $"j_value_list".getField("_1"))
.withColumn("value_list", $"j_value_list".getField("_2"))
.drop("j_value_list")
.join(df_label, "i")
df_collected_with_labels.show()
+---+------+------------+-----+
| i|j_list| value_list|label|
+---+------+------------+-----+
| 1| [1]| [11.0]| 21.9|
| 0|[0, 1]|[13.0, 14.0]| 41.1|
+---+------+------------+-----+
val unique_j = df.dropDuplicates("j").count().toInt
val sparse_df = df_collected_with_labels
.map(r => LabeledPoint(r.getDouble(3),
new SparseVector(size = unique_j,
indices = r.getAs[Seq[Int]]("j_list").toArray,
values = r.getAs[Seq[Double]]("value_list").toArray)))
sparse_df.show()
+-----+--------------------+
|label| features|
+-----+--------------------+
| 21.9| (2,[1],[11.0])|
| 41.1|(2,[0,1],[13.0,14...|
+-----+--------------------+
// Fit sparse regression!
val lr = new LinearRegression()
.setFitIntercept(false)
val lrModel = lr.fit(sparse_df)
lrModel.coefficients
org.apache.spark.ml.linalg.Vector = [1.0174825174825193,1.9909090909090894]
lrModel.predict(new SparseVector(size = unique_j, indices = Array(0), values = Array(4.0)))
Double = 4.069930069930077