如何在 Spark 数据集中创建 TypedColumn 并对其进行操作?
How to create a TypedColumn in a Spark Dataset and manipulate it?
我正在尝试使用 mapGroups
执行聚合,returns 一个 SparseMatrix 作为列之一,并对列求和。
我为映射的行创建了一个 case class
模式以提供列名。矩阵列的类型为 org.apache.spark.mllib.linalg.Matrix
。如果我在执行聚合之前不 运行 toDF
(select(sum("mycolumn")
),我会得到一个类型不匹配错误 (required: org.apache.spark.sql.TypedColumn[MySchema,?]
)。如果我包含 toDF
,我会得到另一个类型不匹配错误:cannot resolve 'sum(mycolumn)' due to data type mismatch: function sum requires numeric types, not org.apache.spark.mllib.linalg.MatrixUDT
。那么正确的做法是什么?
看来您在这里至少遇到了两个不同的问题。假设您有这样的 Dataset
:
val ds = Seq(
("foo", Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))),
("foo", Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0)))
).toDS
选择TypedColumn
:
使用隐式转换 $
:
ds.select(col("_1").as[String])
使用o.a.s.sql.functions.col
:
ds.select(col("_1").as[String])
相加矩阵:
- MLLib
Matrix
和 MatrixUDT
不实现加法。这意味着您将无法 sum
函数或减少 +
- 您可以使用第三方线性代数库,但这在 Spark SQL / Spark 数据集
中不受支持
如果你真的想用 Datsets
来做,你可以尝试这样做:
ds.groupByKey(_._1).mapGroups(
(key, values) => {
val matrices = values.map(_._2.toArray)
val first = matrices.next
val sum = matrices.foldLeft(first)(
(acc, m) => acc.zip(m).map { case (x, y) => x + y }
)
(key, sum)
})
并映射回矩阵,但我个人只是转换为 RDD 并使用 breeze
.
我正在尝试使用 mapGroups
执行聚合,returns 一个 SparseMatrix 作为列之一,并对列求和。
我为映射的行创建了一个 case class
模式以提供列名。矩阵列的类型为 org.apache.spark.mllib.linalg.Matrix
。如果我在执行聚合之前不 运行 toDF
(select(sum("mycolumn")
),我会得到一个类型不匹配错误 (required: org.apache.spark.sql.TypedColumn[MySchema,?]
)。如果我包含 toDF
,我会得到另一个类型不匹配错误:cannot resolve 'sum(mycolumn)' due to data type mismatch: function sum requires numeric types, not org.apache.spark.mllib.linalg.MatrixUDT
。那么正确的做法是什么?
看来您在这里至少遇到了两个不同的问题。假设您有这样的 Dataset
:
val ds = Seq(
("foo", Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))),
("foo", Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0)))
).toDS
选择TypedColumn
:
使用隐式转换
$
:ds.select(col("_1").as[String])
使用
o.a.s.sql.functions.col
:ds.select(col("_1").as[String])
相加矩阵:
- MLLib
Matrix
和MatrixUDT
不实现加法。这意味着您将无法sum
函数或减少+
- 您可以使用第三方线性代数库,但这在 Spark SQL / Spark 数据集 中不受支持
如果你真的想用 Datsets
来做,你可以尝试这样做:
ds.groupByKey(_._1).mapGroups(
(key, values) => {
val matrices = values.map(_._2.toArray)
val first = matrices.next
val sum = matrices.foldLeft(first)(
(acc, m) => acc.zip(m).map { case (x, y) => x + y }
)
(key, sum)
})
并映射回矩阵,但我个人只是转换为 RDD 并使用 breeze
.