使用 UDF 在 Spark DataFrame 中创建一个新列

Create a new column in Spark DataFrame using UDF

我有一个 UDF 如下 -

  val myUdf = udf((col_abc: String, col_xyz: String) => {
    array(
      struct(
        lit("x").alias("col1"),
        col(col_abc).alias("col2"),
        col(col_xyz).alias("col3")
      )
    )
  }

现在,我想在下面的函数中使用它 -

def myfunc(): Column = {
    val myvariable = myUdf($"col_abc", $"col_xyz")
    myvariable
}

然后使用此函数在我的 DataFrame 中创建一个新列

val df = df..withColumn("new_col", myfunc())

总而言之,我希望我的列“new_col”是一个值为 [[x, x, x]]

的类型数组

我收到以下错误。我在这里做错了什么?

Caused by: java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Column is not supported

两种方式。

  1. 不要使用 UDF,因为您使用的是纯 Spark 函数:
val myUdf = ((col_abc: String, col_xyz: String) => {
    array(
      struct(
        lit("x").alias("col1"),
        col(col_abc).alias("col2"),
        col(col_xyz).alias("col3")
      )
    )
  }
)

def myfunc(): Column = {
    val myvariable = myUdf("col_abc", "col_xyz")
    myvariable
}

df.withColumn("new_col", myfunc()).show
+-------+-------+---------------+
|col_abc|col_xyz|        new_col|
+-------+-------+---------------+
|    abc|    xyz|[[x, abc, xyz]]|
+-------+-------+---------------+
  1. 使用接受字符串的 UDF 和 returns 大小写序列 class:
case class cols (col1: String, col2: String, col3: String)

val myUdf = udf((col_abc: String, col_xyz: String) => Seq(cols("x", col_abc, col_xyz)))

def myfunc(): Column = {
    val myvariable = myUdf($"col_abc", $"col_xyz")
    myvariable
}

df.withColumn("new_col", myfunc()).show
+-------+-------+---------------+
|col_abc|col_xyz|        new_col|
+-------+-------+---------------+
|    abc|    xyz|[[x, abc, xyz]]|
+-------+-------+---------------+

如果你想将 Columns 传递给函数,这里有一个例子:

val myUdf = ((col_abc: Column, col_xyz: Column) => {
    array(
      struct(
        lit("x").alias("col1"),
        col_abc.alias("col2"),
        col_xyz.alias("col3")
      )
    )
  }
)