如何在写入时强制数据集匹配其模式?
How to force dataset to match its schema upon writing?
经过一些转换后,我想使用 insertInto 将 spark 数据集保存到 parquet table。
ds.write.mode(SaveMode.Overwrite).insertInto(tablename)
但是操作失败,报错:
[TABLENAME] requires that the data to be inserted have the same number of columns as the target table: target table has 11 column(s) but the inserted data has 19 column(s)
正确的列数是 11,并且是 class 情况下的属性数对于转换(连接等的附加列)仍然存在,导致保存过程失败。
我找到的解决方法是加载 table 数据和 select 其列:
val tableSchema = spark.table(tablename).schema
val dfWithCorrectColumns = ds.select(tableSchema.fieldNames.map(col) : _*)
但这个解决方案似乎很老套。我知道 saveAsTable 将是一个具有模式强制机制的选项,但也不推荐这样做,因为缺少动态分区覆盖。
是否有一种简单的方法可以将数据集“截断”为其定义,并强制执行其模式?
如果您使用案例 class A
创建了类型 Dataset[A]
的值 ds
,那么您可以使用以下命令将其截断为您需要的字段:
val ds_clean: Dataset[A] = ds.map(identity)
经过一些转换后,我想使用 insertInto 将 spark 数据集保存到 parquet table。
ds.write.mode(SaveMode.Overwrite).insertInto(tablename)
但是操作失败,报错:
[TABLENAME] requires that the data to be inserted have the same number of columns as the target table: target table has 11 column(s) but the inserted data has 19 column(s)
正确的列数是 11,并且是 class 情况下的属性数对于转换(连接等的附加列)仍然存在,导致保存过程失败。
我找到的解决方法是加载 table 数据和 select 其列:
val tableSchema = spark.table(tablename).schema
val dfWithCorrectColumns = ds.select(tableSchema.fieldNames.map(col) : _*)
但这个解决方案似乎很老套。我知道 saveAsTable 将是一个具有模式强制机制的选项,但也不推荐这样做,因为缺少动态分区覆盖。
是否有一种简单的方法可以将数据集“截断”为其定义,并强制执行其模式?
如果您使用案例 class A
创建了类型 Dataset[A]
的值 ds
,那么您可以使用以下命令将其截断为您需要的字段:
val ds_clean: Dataset[A] = ds.map(identity)