如何就地修改数据框,使其 ArrayType 列不能为空(nullable = false 和 containsNull = false)?

How to modify a dataframe in-place so that its ArrayType column can't be null (nullable = false and containsNull = false)?

采用以下示例数据框:

val df = Seq(Seq("xxx")).toDF("a")

架构:

root
 |-- a: array (nullable = true)
 |    |-- element: string (containsNull = true)

如何就地修改 df 以便生成的数据框在任何地方都不可为空,即具有以下架构:

root
 |-- a: array (nullable = false)
 |    |-- element: string (containsNull = false)

我知道我可以重新创建另一个数据框来强制执行不可为 null 的模式,例如遵循

spark.createDataFrame(df.rdd, StructType(StructField("a", ArrayType(StringType, false), false) :: Nil))

但这不是结构化流媒体下的一个选项,所以我希望它是某种就地修改。

所以实现这一点的方法是 UserDefinedFunction

// Problem setup
val df = Seq(Seq("xxx")).toDF("a")

df.printSchema
root
|-- a: array (nullable = true)
|    |-- element: string (containsNull = true)

解决方案:

import org.apache.spark.sql.types.{ArrayType, StringType}
import org.apache.spark.sql.functions.{udf, col}

// We define a sub schema with the appropriate data type and null condition
val subSchema = ArrayType(StringType, containsNull = false)

// We create a UDF that applies this sub schema
// while specifying the output of the UDF to be non-nullable
val applyNonNullableSchemaUdf =  udf((x:Seq[String]) => x, subSchema).asNonNullable

// We apply the UDF
val newSchemaDF = df.withColumn("a", applyNonNullableSchemaUdf(col("a")))

给你了。

// Check new schema
newSchemaDF.printSchema
root
|-- a: array (nullable = false)
|    |-- element: string (containsNull = false)

// Check that it actually works
newSchemaDF.show
+-----+
|    a|
+-----+
|[xxx]|
+-----+