使用 Decimal 列的 Spark DF 模式验证

Spark DF schema verification with Decimal column

我想根据从其他来源(仪表板工具)获得的架构信息验证 Spark 数据框的架构。我得到的关于 table 的信息是字段名称和字段类型(此时可为空性并不重要)。 但是,对于 DecimalType 列,我没有获得有关精度和比例(DecimalType 的两个参数)的信息。所以我不得不在比较中忽略这些值。

我目前正在重写架构,以便 Decimal 列成为 Float 列。但是有没有更优雅的方法呢?

基本上我想写一个这样工作的函数is_schema_valid()

from pyspark.sql import types as T

df_schema = T.StructType([
    T.StructField('column_1', T.StringType(), True),
    T.StructField('column_2', T.DecimalType(20,5), True), # values in DecimalType are random
])

schema_info = [('column_1', 'String'), ('column_2', 'Decimal')]

is_schema_valid(schema_info, df_schema)
# Output: True

最好比较相似的对象。您可以在 JSON 对象(或 python 字典)中转换模式。

import json 

_df_schema_dict = json.loads(df_schema.json())
df_schema_dict = {
    field["name"]: field["type"]
    for field in _df_schema_dict["fields"]
}

df_schema_dict
> {'column_1': 'string', 'column_2': 'decimal(20,5)'}

您可以使用此对象与 schema_info 进行比较。这是您可以做的一个非常基本的测试(我稍微更改了 schema_info 的内容):

import json


def is_schema_valid(schema_info, df_schema):

    df_schema_dict = {
        field["name"]: field["type"] for field in json.loads(df_schema.json())["fields"]
    }

    schema_info_dict = {elt[0]: elt[1] for elt in schema_info}

    return schema_info_dict == df_schema_dict


df_schema = T.StructType(
    [
        T.StructField("column_1", T.StringType(), True),
        T.StructField("column_2", T.DecimalType(20, 5), True),
    ]
)
schema_info = [("column_1", "string"), ("column_2", "decimal(20,5)")]

is_schema_valid(schema_info, df_schema)
# True

如果你想忽略 decimal 精度,你总是可以稍微扭曲数据帧架构。例如,将 field["type"] 替换为 field["type"] if "decimal" not in field["type"] else "decimal"

import json


def is_schema_valid(schema_info, df_schema):

    df_schema_dict = {
        field["name"]: field["type"] if "decimal" not in field["type"] else "decimal"
        for field in json.loads(df_schema.json())["fields"]
    }

    schema_info_dict = {elt[0]: elt[1] for elt in schema_info}

    return schema_info_dict == df_schema_dict


df_schema = T.StructType(
    [
        T.StructField("column_1", T.StringType(), True),
        T.StructField("column_2", T.DecimalType(20, 5), True),
    ]
)
schema_info = [("column_1", "string"), ("column_2", "decimal")]

is_schema_valid(schema_info, df_schema)
# True