如何复制源 Spark Dataframe 模式的可空性状态并将其强制到目标 Spark Dataframe?

How can I copy the nullability state of a source Spark Dataframe schema and force it onto a target Spark Dataframe?

我正在使用 Databricks。假设我有两个 Spark Dataframes(我正在使用 PySpark):

如果 df_source 具有以下架构:

root
 |-- name: string (nullable = false)
 |-- id: long (nullable = false)
 |-- age: long (nullable = true)

并且 df_target 具有以下架构:

root
 |-- name: string (nullable = true)
 |-- id: long (nullable = false)
 |-- age: long (nullable = false)

我如何有效地创建另一个 Dataframe,df_final 其中 df_source 中的 (nullable = true/false) 属性 可以被强制到 df_target ?

我已尝试执行以下操作:

df_final = spark.createDataFrame(df_target.rdd, schema = df_source.schema)

通过这种方法,我能够达到预期的结果,但对于我拥有的数据集大小来说似乎需要花费很长时间。对于较小的数据集,它工作正常。对于较大的数据集,使用 collect() 函数而不是 rdd 转换显然更糟糕。

我想指出,我在这里唯一想做的就是从源模式中复制可空性部分,并在目标中相应地更改它,以获得最终的数据帧。

有没有办法进行某种类型的可空性转换,其工作方式类似于 .withColumn() 的性能,无需 RDD 转换,无需在代码中明确指定列名?列顺序已在源和目标之间对齐。

附加上下文:我需要这样做的原因是因为我需要将df_final写入(附加)到Google BigQuery table 使用 Spark BQ 连接器。因此,即使我的 Spark Dataframe 的列中没有任何空值,但可空性 属性 设置为 true,BigQuery table 也会拒绝写入操作,因为 BigQuery table 可能将可为空的 属性 设置为 false,并且架构不匹配。

由于您知道 age 不能为空,因此您可以 coalesce 年龄和常量文字来创建不可为空的字段。对于 nullable 字段必须从 false 转换为 true 的字段,可以使用 when 表达式。

from pyspark.sql.types import StructType, StructField, StringType, LongType
from pyspark.sql import functions as F

df_source_schema = StructType([
    StructField("name", StringType(), False),
    StructField("id", LongType(), False),
    StructField("age", LongType(), True),
])

df_target_schema = StructType([
    StructField("name", StringType(), True),
    StructField("id", LongType(), False),
    StructField("age", LongType(), False),
])

df_source = spark.createDataFrame([("a", 1, 18, ), ], df_source_schema)
df_source.printSchema()
"""
root
 |-- name: string (nullable = false)
 |-- id: long (nullable = false)
 |-- age: long (nullable = true)
"""

df_target = spark.createDataFrame([("a", 1, 18), ], df_target_schema)
df_target.printSchema()
"""
root
 |-- name: string (nullable = true)
 |-- id: long (nullable = false)
 |-- age: long (nullable = false)
"""

# Construct selection expression based on the logic described above

target_field_nullable_map = {field.name: field.nullable for field in df_target.schema}

selection_expr = []

for src_field in df_source.schema:
    field_name = src_field.name
    field_type = src_field.dataType
    
    if target_field_nullable_map[field_name] != src_field.nullable:
        if src_field.nullable:
            selection_expr.append(F.when(F.col(field_name).isNotNull(), F.col(field_name)).otherwise(F.lit(None)).alias(field_name))
        else:
            selection_expr.append(F.coalesce(F.col(field_name), F.lit("-1").cast(field_type)).alias(field_name))
    else:
        selection_expr.append(F.col(field_name))

df_final = df_target.select(*selection_expr)
df_final.printSchema()
"""
root
 |-- name: string (nullable = false)
 |-- id: long (nullable = false)
 |-- age: long (nullable = true)
"""

为什么这样做有效?

对于要成为 nullcoalesce 表达式,从 here 可以看出,它的所有子表达式都必须为 null。 因为当 value != null coalesce 导致不可为空的列时 lit 是一个非空表达式。

A when 表达式可以为空,如果 else 表达式可以为空,如所述 here.