Pyspark delta lake json 数据类型演变问题(合并不兼容异常)
Pyspark delta lake json datatype evolution issue ( merge incompatible exception )
我正在研究 pyspark (3.x) 和 delta lake。我在数据类型方面面临一些挑战 w.r.t。
我们正在接收 JSON 数据类型的数据,我们正在对 JSON 数据集进行一些展平并将其保存为增量 tables,选项为“mergeSchema”,如下所示。我们没有在 table.
上强加任何架构
df.write\
.format("delta")\
.partitionBy("country","city")\
.option("mergeSchema","true")\
.mode("append")\
.save(delta_path)\
我们面临的问题是 - JSON 字段的数据类型经常更改,例如在 delta table 中,“field_1” 以数据类型存储为 StringType但是 'field_1' for new JSON 的数据类型是 LongType。因此,我们得到合并不兼容的异常。
ERROR : Failed to merge fields 'field_1' and 'field_1'. Failed to merge incompatible data types StringType and LongType
如何处理增量 table 中的此类数据类型演变,我不想在字段级别处理数据类型更改,因为我们有超过 300 多个字段作为 json 的一部分。
根据文章 Diving Into Delta Lake: Schema Enforcement & Evolution 选项 mergeSchema=true
可以处理以下情况:
- 添加新列(这是最常见的情况)
- 从 NullType 更改数据类型 -> 任何其他类型,或从 ByteType -> ShortType -> IntegerType 向上转换
文章还提示了在您的情况下可以做什么:
“其他不符合架构演变条件的更改需要通过添加 .option("overwriteSchema", "true")
来覆盖架构和数据。例如,在列“Foo”最初是 整数数据类型,而新模式将是字符串数据类型,那么所有 Parquet(数据)文件都需要重写。这些更改包括:"
- 删除列
- 更改现有列的数据类型(就地)
- 重命名仅大小写不同的列名称(例如“Foo”和“foo”)
为了解决我的问题,我编写了一个新函数,它实质上合并了增量 table(如果增量 table 存在)和 JSON 模式的模式。
在高层次上,我创建了一个新模式 - 这个新模式本质上是来自 delta lake table 的公共列和来自 JSON 字段的新列的组合,通过创建这个新的模式 我通过应用这个新模式重新创建了一个数据框。
这解决了我的问题。
def get_merged_schema(delta_table_schema, json_data_schema):
print('str(len(delta_table_schema.fields)) -> ' + str(len(delta_table_schema.fields)))
print('str(len(json_data_schema.fields)) -> '+ str(len(json_data_schema.fields)))
no_commom_elements=False
no_new_elements=False
import numpy as np
struct_field_array=[]
if len(set(delta_table_schema.names).intersection(set(json_data_schema.names))) > 0:
common_col=set(delta_table_schema.names).intersection(set(json_data_schema.names))
print('common_col len: -> '+ str(len(common_col)))
for name in common_col:
for f in delta_table_schema.fields:
if(f.name == name):
struct_field_array.append(StructField(f.name, f.dataType, f.nullable))
else:
no_commom_elements=True
print("no common elements")
if len(np.setdiff1d(json_data_schema.names,delta_table_schema.names)) > 0:
diff_list = np.setdiff1d(json_data_schema.names,delta_table_schema.names)
print('diff_list len: -> '+ str(len(diff_list)))
for name in diff_list:
for f in json_data_schema.fields:
if(f.name == name):
struct_field_array.append(StructField(f.name, f.dataType, f.nullable))
else:
no_new_elements=True
print("no new elements")
print('len(StructType(struct_field_array)) -> '+str(len(StructType(struct_field_array))))
df=spark.createDataFrame(spark.sparkContext.emptyRDD(),StructType(struct_field_array))
if no_commom_elements and no_new_elements:
return StructType(None)
else:
return df.select(sorted(df.columns)).schema
我也采用了类似nilesh1212的方法,即手动merge schema
在我的例子中,我的脚本可以处理嵌套类型,可以在这里找到:
https://github.com/miguellobato84/spark-delta-schema-evolution
另外,我写了这篇关于这个问题的文章
https://medium.com/@miguellobato84/improving-delta-lake-schema-evolution-2cce8db2f0f5
我正在研究 pyspark (3.x) 和 delta lake。我在数据类型方面面临一些挑战 w.r.t。 我们正在接收 JSON 数据类型的数据,我们正在对 JSON 数据集进行一些展平并将其保存为增量 tables,选项为“mergeSchema”,如下所示。我们没有在 table.
上强加任何架构df.write\
.format("delta")\
.partitionBy("country","city")\
.option("mergeSchema","true")\
.mode("append")\
.save(delta_path)\
我们面临的问题是 - JSON 字段的数据类型经常更改,例如在 delta table 中,“field_1” 以数据类型存储为 StringType但是 'field_1' for new JSON 的数据类型是 LongType。因此,我们得到合并不兼容的异常。
ERROR : Failed to merge fields 'field_1' and 'field_1'. Failed to merge incompatible data types StringType and LongType
如何处理增量 table 中的此类数据类型演变,我不想在字段级别处理数据类型更改,因为我们有超过 300 多个字段作为 json 的一部分。
根据文章 Diving Into Delta Lake: Schema Enforcement & Evolution 选项 mergeSchema=true
可以处理以下情况:
- 添加新列(这是最常见的情况)
- 从 NullType 更改数据类型 -> 任何其他类型,或从 ByteType -> ShortType -> IntegerType 向上转换
文章还提示了在您的情况下可以做什么:
“其他不符合架构演变条件的更改需要通过添加 .option("overwriteSchema", "true")
来覆盖架构和数据。例如,在列“Foo”最初是 整数数据类型,而新模式将是字符串数据类型,那么所有 Parquet(数据)文件都需要重写。这些更改包括:"
- 删除列
- 更改现有列的数据类型(就地)
- 重命名仅大小写不同的列名称(例如“Foo”和“foo”)
为了解决我的问题,我编写了一个新函数,它实质上合并了增量 table(如果增量 table 存在)和 JSON 模式的模式。
在高层次上,我创建了一个新模式 - 这个新模式本质上是来自 delta lake table 的公共列和来自 JSON 字段的新列的组合,通过创建这个新的模式 我通过应用这个新模式重新创建了一个数据框。 这解决了我的问题。
def get_merged_schema(delta_table_schema, json_data_schema):
print('str(len(delta_table_schema.fields)) -> ' + str(len(delta_table_schema.fields)))
print('str(len(json_data_schema.fields)) -> '+ str(len(json_data_schema.fields)))
no_commom_elements=False
no_new_elements=False
import numpy as np
struct_field_array=[]
if len(set(delta_table_schema.names).intersection(set(json_data_schema.names))) > 0:
common_col=set(delta_table_schema.names).intersection(set(json_data_schema.names))
print('common_col len: -> '+ str(len(common_col)))
for name in common_col:
for f in delta_table_schema.fields:
if(f.name == name):
struct_field_array.append(StructField(f.name, f.dataType, f.nullable))
else:
no_commom_elements=True
print("no common elements")
if len(np.setdiff1d(json_data_schema.names,delta_table_schema.names)) > 0:
diff_list = np.setdiff1d(json_data_schema.names,delta_table_schema.names)
print('diff_list len: -> '+ str(len(diff_list)))
for name in diff_list:
for f in json_data_schema.fields:
if(f.name == name):
struct_field_array.append(StructField(f.name, f.dataType, f.nullable))
else:
no_new_elements=True
print("no new elements")
print('len(StructType(struct_field_array)) -> '+str(len(StructType(struct_field_array))))
df=spark.createDataFrame(spark.sparkContext.emptyRDD(),StructType(struct_field_array))
if no_commom_elements and no_new_elements:
return StructType(None)
else:
return df.select(sorted(df.columns)).schema
我也采用了类似nilesh1212的方法,即手动merge schema
在我的例子中,我的脚本可以处理嵌套类型,可以在这里找到: https://github.com/miguellobato84/spark-delta-schema-evolution
另外,我写了这篇关于这个问题的文章 https://medium.com/@miguellobato84/improving-delta-lake-schema-evolution-2cce8db2f0f5