使用 Decimal 列的 Spark DF 模式验证
Spark DF schema verification with Decimal column
我想根据从其他来源(仪表板工具)获得的架构信息验证 Spark 数据框的架构。我得到的关于 table 的信息是字段名称和字段类型(此时可为空性并不重要)。
但是,对于 DecimalType
列,我没有获得有关精度和比例(DecimalType 的两个参数)的信息。所以我不得不在比较中忽略这些值。
我目前正在重写架构,以便 Decimal 列成为 Float 列。但是有没有更优雅的方法呢?
基本上我想写一个这样工作的函数is_schema_valid()
:
from pyspark.sql import types as T
df_schema = T.StructType([
T.StructField('column_1', T.StringType(), True),
T.StructField('column_2', T.DecimalType(20,5), True), # values in DecimalType are random
])
schema_info = [('column_1', 'String'), ('column_2', 'Decimal')]
is_schema_valid(schema_info, df_schema)
# Output: True
最好比较相似的对象。您可以在 JSON 对象(或 python 字典)中转换模式。
import json
_df_schema_dict = json.loads(df_schema.json())
df_schema_dict = {
field["name"]: field["type"]
for field in _df_schema_dict["fields"]
}
df_schema_dict
> {'column_1': 'string', 'column_2': 'decimal(20,5)'}
您可以使用此对象与 schema_info
进行比较。这是您可以做的一个非常基本的测试(我稍微更改了 schema_info
的内容):
import json
def is_schema_valid(schema_info, df_schema):
df_schema_dict = {
field["name"]: field["type"] for field in json.loads(df_schema.json())["fields"]
}
schema_info_dict = {elt[0]: elt[1] for elt in schema_info}
return schema_info_dict == df_schema_dict
df_schema = T.StructType(
[
T.StructField("column_1", T.StringType(), True),
T.StructField("column_2", T.DecimalType(20, 5), True),
]
)
schema_info = [("column_1", "string"), ("column_2", "decimal(20,5)")]
is_schema_valid(schema_info, df_schema)
# True
如果你想忽略 decimal
精度,你总是可以稍微扭曲数据帧架构。例如,将 field["type"]
替换为 field["type"] if "decimal" not in field["type"] else "decimal"
。
import json
def is_schema_valid(schema_info, df_schema):
df_schema_dict = {
field["name"]: field["type"] if "decimal" not in field["type"] else "decimal"
for field in json.loads(df_schema.json())["fields"]
}
schema_info_dict = {elt[0]: elt[1] for elt in schema_info}
return schema_info_dict == df_schema_dict
df_schema = T.StructType(
[
T.StructField("column_1", T.StringType(), True),
T.StructField("column_2", T.DecimalType(20, 5), True),
]
)
schema_info = [("column_1", "string"), ("column_2", "decimal")]
is_schema_valid(schema_info, df_schema)
# True
我想根据从其他来源(仪表板工具)获得的架构信息验证 Spark 数据框的架构。我得到的关于 table 的信息是字段名称和字段类型(此时可为空性并不重要)。
但是,对于 DecimalType
列,我没有获得有关精度和比例(DecimalType 的两个参数)的信息。所以我不得不在比较中忽略这些值。
我目前正在重写架构,以便 Decimal 列成为 Float 列。但是有没有更优雅的方法呢?
基本上我想写一个这样工作的函数is_schema_valid()
:
from pyspark.sql import types as T
df_schema = T.StructType([
T.StructField('column_1', T.StringType(), True),
T.StructField('column_2', T.DecimalType(20,5), True), # values in DecimalType are random
])
schema_info = [('column_1', 'String'), ('column_2', 'Decimal')]
is_schema_valid(schema_info, df_schema)
# Output: True
最好比较相似的对象。您可以在 JSON 对象(或 python 字典)中转换模式。
import json
_df_schema_dict = json.loads(df_schema.json())
df_schema_dict = {
field["name"]: field["type"]
for field in _df_schema_dict["fields"]
}
df_schema_dict
> {'column_1': 'string', 'column_2': 'decimal(20,5)'}
您可以使用此对象与 schema_info
进行比较。这是您可以做的一个非常基本的测试(我稍微更改了 schema_info
的内容):
import json
def is_schema_valid(schema_info, df_schema):
df_schema_dict = {
field["name"]: field["type"] for field in json.loads(df_schema.json())["fields"]
}
schema_info_dict = {elt[0]: elt[1] for elt in schema_info}
return schema_info_dict == df_schema_dict
df_schema = T.StructType(
[
T.StructField("column_1", T.StringType(), True),
T.StructField("column_2", T.DecimalType(20, 5), True),
]
)
schema_info = [("column_1", "string"), ("column_2", "decimal(20,5)")]
is_schema_valid(schema_info, df_schema)
# True
如果你想忽略 decimal
精度,你总是可以稍微扭曲数据帧架构。例如,将 field["type"]
替换为 field["type"] if "decimal" not in field["type"] else "decimal"
。
import json
def is_schema_valid(schema_info, df_schema):
df_schema_dict = {
field["name"]: field["type"] if "decimal" not in field["type"] else "decimal"
for field in json.loads(df_schema.json())["fields"]
}
schema_info_dict = {elt[0]: elt[1] for elt in schema_info}
return schema_info_dict == df_schema_dict
df_schema = T.StructType(
[
T.StructField("column_1", T.StringType(), True),
T.StructField("column_2", T.DecimalType(20, 5), True),
]
)
schema_info = [("column_1", "string"), ("column_2", "decimal")]
is_schema_valid(schema_info, df_schema)
# True