Spark - 在数据集的几列上应用 UDF 并形成新列

Spark - Apply UDF on few columns of a Dataset and form new columns

我有一个字符串类型的数据集,我想在此数据集的某些列上应用一个函数,并根据列将它们转换为 Long 或 Double 或 Int 等,并附加新列(甚至是元组这些列的)到同一个数据集。有人可以建议这样做的正确方法吗?

更新:

以下失败:

ds.withColumn("newCol", Vectors.dense(strDoubleUDF(ds("col10")) + str2DoubleUDF(ds("col12")))

有错误

<console>:253: error: overloaded method value dense with alternatives:
  (values: Array[Double])org.apache.spark.mllib.linalg.Vector <and>
  (firstValue: Double,otherValues: Double*)org.apache.spark.mllib.linalg.Vector
 cannot be applied to (org.apache.spark.sql.Column, org.apache.spark.sql.Column)
                         Vectors.dense(str2DoubleUDF(ds("col10")),

这里有一个如何实现这个的例子:

val ds: Dataset[(String, String)] = Seq(
  ("1.0","1"),
  ("2.0","2"),
  ("3.0","3"),
  ("4.0","4")
).toDS()


val newDs: Dataset[(String, String, (Double, Int))] = ds
  .map{case (doubleStr,intStr) =>
  (doubleStr,
    intStr,
    (doubleStr.toDouble,intStr.toInt) // new struct/tuple column 
  )
}

没有对制作 Vector 的内置支持,因此您应该使用 UDF:

val vectorUDF = udf ((col1 : Seq[Double], col2 : Seq[Double]) => {
    Vectors.dense(col1 + col2)
});
ds.withColumn("newCol", vectorUDF(strDoubleUDF(ds("col10")), str2DoubleUDF(ds("col12")))