使用 UDF 在 Spark DataFrame 中创建一个新列
Create a new column in Spark DataFrame using UDF
我有一个 UDF 如下 -
val myUdf = udf((col_abc: String, col_xyz: String) => {
array(
struct(
lit("x").alias("col1"),
col(col_abc).alias("col2"),
col(col_xyz).alias("col3")
)
)
}
现在,我想在下面的函数中使用它 -
def myfunc(): Column = {
val myvariable = myUdf($"col_abc", $"col_xyz")
myvariable
}
然后使用此函数在我的 DataFrame 中创建一个新列
val df = df..withColumn("new_col", myfunc())
总而言之,我希望我的列“new_col”是一个值为 [[x, x, x]]
的类型数组
我收到以下错误。我在这里做错了什么?
Caused by: java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Column is not supported
两种方式。
- 不要使用 UDF,因为您使用的是纯 Spark 函数:
val myUdf = ((col_abc: String, col_xyz: String) => {
array(
struct(
lit("x").alias("col1"),
col(col_abc).alias("col2"),
col(col_xyz).alias("col3")
)
)
}
)
def myfunc(): Column = {
val myvariable = myUdf("col_abc", "col_xyz")
myvariable
}
df.withColumn("new_col", myfunc()).show
+-------+-------+---------------+
|col_abc|col_xyz| new_col|
+-------+-------+---------------+
| abc| xyz|[[x, abc, xyz]]|
+-------+-------+---------------+
- 使用接受字符串的 UDF 和 returns 大小写序列 class:
case class cols (col1: String, col2: String, col3: String)
val myUdf = udf((col_abc: String, col_xyz: String) => Seq(cols("x", col_abc, col_xyz)))
def myfunc(): Column = {
val myvariable = myUdf($"col_abc", $"col_xyz")
myvariable
}
df.withColumn("new_col", myfunc()).show
+-------+-------+---------------+
|col_abc|col_xyz| new_col|
+-------+-------+---------------+
| abc| xyz|[[x, abc, xyz]]|
+-------+-------+---------------+
如果你想将 Columns 传递给函数,这里有一个例子:
val myUdf = ((col_abc: Column, col_xyz: Column) => {
array(
struct(
lit("x").alias("col1"),
col_abc.alias("col2"),
col_xyz.alias("col3")
)
)
}
)
我有一个 UDF 如下 -
val myUdf = udf((col_abc: String, col_xyz: String) => {
array(
struct(
lit("x").alias("col1"),
col(col_abc).alias("col2"),
col(col_xyz).alias("col3")
)
)
}
现在,我想在下面的函数中使用它 -
def myfunc(): Column = {
val myvariable = myUdf($"col_abc", $"col_xyz")
myvariable
}
然后使用此函数在我的 DataFrame 中创建一个新列
val df = df..withColumn("new_col", myfunc())
总而言之,我希望我的列“new_col”是一个值为 [[x, x, x]]
的类型数组我收到以下错误。我在这里做错了什么?
Caused by: java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Column is not supported
两种方式。
- 不要使用 UDF,因为您使用的是纯 Spark 函数:
val myUdf = ((col_abc: String, col_xyz: String) => {
array(
struct(
lit("x").alias("col1"),
col(col_abc).alias("col2"),
col(col_xyz).alias("col3")
)
)
}
)
def myfunc(): Column = {
val myvariable = myUdf("col_abc", "col_xyz")
myvariable
}
df.withColumn("new_col", myfunc()).show
+-------+-------+---------------+
|col_abc|col_xyz| new_col|
+-------+-------+---------------+
| abc| xyz|[[x, abc, xyz]]|
+-------+-------+---------------+
- 使用接受字符串的 UDF 和 returns 大小写序列 class:
case class cols (col1: String, col2: String, col3: String)
val myUdf = udf((col_abc: String, col_xyz: String) => Seq(cols("x", col_abc, col_xyz)))
def myfunc(): Column = {
val myvariable = myUdf($"col_abc", $"col_xyz")
myvariable
}
df.withColumn("new_col", myfunc()).show
+-------+-------+---------------+
|col_abc|col_xyz| new_col|
+-------+-------+---------------+
| abc| xyz|[[x, abc, xyz]]|
+-------+-------+---------------+
如果你想将 Columns 传递给函数,这里有一个例子:
val myUdf = ((col_abc: Column, col_xyz: Column) => {
array(
struct(
lit("x").alias("col1"),
col_abc.alias("col2"),
col_xyz.alias("col3")
)
)
}
)