如何在 Spark 数据集中创建 TypedColumn 并对其进行操作?

How to create a TypedColumn in a Spark Dataset and manipulate it?

我正在尝试使用 mapGroups 执行聚合,returns 一个 SparseMatrix 作为列之一,并对列求和。

我为映射的行创建了一个 case class 模式以提供列名。矩阵列的类型为 org.apache.spark.mllib.linalg.Matrix。如果我在执行聚合之前不 运行 toDF (select(sum("mycolumn")),我会得到一个类型不匹配错误 (required: org.apache.spark.sql.TypedColumn[MySchema,?])。如果我包含 toDF,我会得到另一个类型不匹配错误:cannot resolve 'sum(mycolumn)' due to data type mismatch: function sum requires numeric types, not org.apache.spark.mllib.linalg.MatrixUDT。那么正确的做法是什么?

看来您在这里至少遇到了两个不同的问题。假设您有这样的 Dataset

val ds = Seq(
  ("foo",  Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))), 
  ("foo",  Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0)))
).toDS

选择TypedColumn:

  • 使用隐式转换 $:

    ds.select(col("_1").as[String])
    
  • 使用o.a.s.sql.functions.col:

    ds.select(col("_1").as[String])
    

相加矩阵:

  • MLLib MatrixMatrixUDT 不实现加法。这意味着您将无法 sum 函数或减少 +
  • 您可以使用第三方线性代数库,但这在 Spark SQL / Spark 数据集
  • 中不受支持

如果你真的想用 Datsets 来做,你可以尝试这样做:

ds.groupByKey(_._1).mapGroups(
  (key, values) => {
    val matrices = values.map(_._2.toArray)
    val first = matrices.next
    val sum = matrices.foldLeft(first)(
      (acc, m) => acc.zip(m).map { case (x, y) => x + y }
    )
    (key, sum)
})

并映射回矩阵,但我个人只是转换为 RDD 并使用 breeze.