Spark - 在数据集的几列上应用 UDF 并形成新列
Spark - Apply UDF on few columns of a Dataset and form new columns
我有一个字符串类型的数据集,我想在此数据集的某些列上应用一个函数,并根据列将它们转换为 Long 或 Double 或 Int 等,并附加新列(甚至是元组这些列的)到同一个数据集。有人可以建议这样做的正确方法吗?
更新:
以下失败:
ds.withColumn("newCol", Vectors.dense(strDoubleUDF(ds("col10")) + str2DoubleUDF(ds("col12")))
有错误
<console>:253: error: overloaded method value dense with alternatives:
(values: Array[Double])org.apache.spark.mllib.linalg.Vector <and>
(firstValue: Double,otherValues: Double*)org.apache.spark.mllib.linalg.Vector
cannot be applied to (org.apache.spark.sql.Column, org.apache.spark.sql.Column)
Vectors.dense(str2DoubleUDF(ds("col10")),
这里有一个如何实现这个的例子:
val ds: Dataset[(String, String)] = Seq(
("1.0","1"),
("2.0","2"),
("3.0","3"),
("4.0","4")
).toDS()
val newDs: Dataset[(String, String, (Double, Int))] = ds
.map{case (doubleStr,intStr) =>
(doubleStr,
intStr,
(doubleStr.toDouble,intStr.toInt) // new struct/tuple column
)
}
没有对制作 Vector 的内置支持,因此您应该使用 UDF:
val vectorUDF = udf ((col1 : Seq[Double], col2 : Seq[Double]) => {
Vectors.dense(col1 + col2)
});
ds.withColumn("newCol", vectorUDF(strDoubleUDF(ds("col10")), str2DoubleUDF(ds("col12")))
我有一个字符串类型的数据集,我想在此数据集的某些列上应用一个函数,并根据列将它们转换为 Long 或 Double 或 Int 等,并附加新列(甚至是元组这些列的)到同一个数据集。有人可以建议这样做的正确方法吗?
更新:
以下失败:
ds.withColumn("newCol", Vectors.dense(strDoubleUDF(ds("col10")) + str2DoubleUDF(ds("col12")))
有错误
<console>:253: error: overloaded method value dense with alternatives:
(values: Array[Double])org.apache.spark.mllib.linalg.Vector <and>
(firstValue: Double,otherValues: Double*)org.apache.spark.mllib.linalg.Vector
cannot be applied to (org.apache.spark.sql.Column, org.apache.spark.sql.Column)
Vectors.dense(str2DoubleUDF(ds("col10")),
这里有一个如何实现这个的例子:
val ds: Dataset[(String, String)] = Seq(
("1.0","1"),
("2.0","2"),
("3.0","3"),
("4.0","4")
).toDS()
val newDs: Dataset[(String, String, (Double, Int))] = ds
.map{case (doubleStr,intStr) =>
(doubleStr,
intStr,
(doubleStr.toDouble,intStr.toInt) // new struct/tuple column
)
}
没有对制作 Vector 的内置支持,因此您应该使用 UDF:
val vectorUDF = udf ((col1 : Seq[Double], col2 : Seq[Double]) => {
Vectors.dense(col1 + col2)
});
ds.withColumn("newCol", vectorUDF(strDoubleUDF(ds("col10")), str2DoubleUDF(ds("col12")))