Pyspark delta lake json 数据类型演变问题(合并不兼容异常)

Pyspark delta lake json datatype evolution issue ( merge incompatible exception )

我正在研究 pyspark (3.x) 和 delta lake。我在数据类型方面面临一些挑战 w.r.t。 我们正在接收 JSON 数据类型的数据,我们正在对 JSON 数据集进行一些展平并将其保存为增量 tables,选项为“mergeSchema”,如下所示。我们没有在 table.

上强加任何架构
df.write\
    .format("delta")\
    .partitionBy("country","city")\
    .option("mergeSchema","true")\
    .mode("append")\
    .save(delta_path)\

我们面临的问题是 - JSON 字段的数据类型经常更改,例如在 delta table 中,“field_1” 以数据类型存储为 StringType但是 'field_1' for new JSON 的数据类型是 LongType。因此,我们得到合并不兼容的异常。

ERROR : Failed to merge fields 'field_1' and 'field_1'. Failed to merge incompatible data types StringType and LongType

如何处理增量 table 中的此类数据类型演变,我不想在字段级别处理数据类型更改,因为我们有超过 300 多个字段作为 json 的一部分。

根据文章 Diving Into Delta Lake: Schema Enforcement & Evolution 选项 mergeSchema=true 可以处理以下情况:

  • 添加新列(这是最常见的情况)
  • 从 NullType 更改数据类型 -> 任何其他类型,或从 ByteType -> ShortType -> IntegerType 向上转换

文章还提示了在您的情况下可以做什么:

“其他不符合架构演变条件的更改需要通过添加 .option("overwriteSchema", "true") 来覆盖架构和数据。例如,在列“Foo”最初是 整数数据类型,而新模式将是字符串数据类型,那么所有 Parquet(数据)文件都需要重写。这些更改包括:"

  • 删除列
  • 更改现有列的数据类型(就地)
  • 重命名仅大小写不同的列名称(例如“Foo”和“foo”)

为了解决我的问题,我编写了一个新函数,它实质上合并了增量 table(如果增量 table 存在)和 JSON 模式的模式。

在高层次上,我创建了一个新模式 - 这个新模式本质上是来自 delta lake table 的公共列和来自 JSON 字段的新列的组合,通过创建这个新的模式 我通过应用这个新模式重新创建了一个数据框。 这解决了我的问题。

def get_merged_schema(delta_table_schema, json_data_schema):
    
    print('str(len(delta_table_schema.fields)) -> ' + str(len(delta_table_schema.fields)))
    print('str(len(json_data_schema.fields)) -> '+ str(len(json_data_schema.fields)))
    
    no_commom_elements=False
    no_new_elements=False
    import numpy as np
    struct_field_array=[]
    if len(set(delta_table_schema.names).intersection(set(json_data_schema.names))) > 0:
        common_col=set(delta_table_schema.names).intersection(set(json_data_schema.names))
        print('common_col len: -> '+ str(len(common_col)))
        for name in common_col:
            for f in delta_table_schema.fields:
              if(f.name == name):
                  struct_field_array.append(StructField(f.name, f.dataType, f.nullable))
    else:
        no_commom_elements=True
        print("no common elements")

    if len(np.setdiff1d(json_data_schema.names,delta_table_schema.names)) > 0:
        diff_list = np.setdiff1d(json_data_schema.names,delta_table_schema.names)
        print('diff_list len: -> '+ str(len(diff_list)))
        for name in diff_list:
            for f in json_data_schema.fields:
                if(f.name == name):
                    struct_field_array.append(StructField(f.name, f.dataType, f.nullable))
    else:
        no_new_elements=True
        print("no new elements")
      
    print('len(StructType(struct_field_array)) -> '+str(len(StructType(struct_field_array))))
    df=spark.createDataFrame(spark.sparkContext.emptyRDD(),StructType(struct_field_array))
    if no_commom_elements and no_new_elements: 
      return StructType(None) 
    else: 
      return df.select(sorted(df.columns)).schema

我也采用了类似nilesh1212的方法,即手动merge schema

在我的例子中,我的脚本可以处理嵌套类型,可以在这里找到: https://github.com/miguellobato84/spark-delta-schema-evolution

另外,我写了这篇关于这个问题的文章 https://medium.com/@miguellobato84/improving-delta-lake-schema-evolution-2cce8db2f0f5