如何向 Delta Lake table 添加新列?
How to add a new column to a Delta Lake table?
我正在尝试向在 Azure Blob 存储中存储为增量 Table 的数据添加一个新列。大多数对数据执行的操作都是更新插入,有很多更新和很少的新插入。我目前写入数据的代码如下所示:
DeltaTable.forPath(spark, deltaPath)
.as("dest_table")
.merge(myDF.as("source_table"),
"dest_table.id = source_table.id")
.whenNotMatched()
.insertAll()
.whenMatched(upsertCond)
.updateExpr(upsertStat)
.execute()
从 these docs 开始,Delta Lake 似乎仅支持在 insertAll()
和 updateAll()
调用中添加新列。但是,我仅在满足某些条件时才进行更新,并希望将新列添加到所有现有数据中(默认值为 null
)。
我想出了一个看起来非常笨拙的解决方案,想知道是否有更优雅的方法。这是我目前提出的解决方案:
// Read in existing data
val myData = spark.read.format("delta").load(deltaPath)
// Register table with Hive metastore
myData.write.format("delta").saveAsTable("input_data")
// Add new column
spark.sql("ALTER TABLE input_data ADD COLUMNS (new_col string)")
// Save as DataFrame and overwrite data on disk
val sqlDF = spark.sql("SELECT * FROM input_data")
sqlDF.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(deltaPath)
您尝试过使用合并语句吗?
https://docs.databricks.com/spark/latest/spark-sql/language-manual/merge-into.html
先更改增量 table,然后再进行合并操作:
from pyspark.sql.functions import lit
spark.read.format("delta").load('/mnt/delta/cov')\
.withColumn("Recovered", lit(''))\
.write\
.format("delta")\
.mode("overwrite")\
.option("overwriteSchema", "true")\
.save('/mnt/delta/cov')
我正在尝试向在 Azure Blob 存储中存储为增量 Table 的数据添加一个新列。大多数对数据执行的操作都是更新插入,有很多更新和很少的新插入。我目前写入数据的代码如下所示:
DeltaTable.forPath(spark, deltaPath)
.as("dest_table")
.merge(myDF.as("source_table"),
"dest_table.id = source_table.id")
.whenNotMatched()
.insertAll()
.whenMatched(upsertCond)
.updateExpr(upsertStat)
.execute()
从 these docs 开始,Delta Lake 似乎仅支持在 insertAll()
和 updateAll()
调用中添加新列。但是,我仅在满足某些条件时才进行更新,并希望将新列添加到所有现有数据中(默认值为 null
)。
我想出了一个看起来非常笨拙的解决方案,想知道是否有更优雅的方法。这是我目前提出的解决方案:
// Read in existing data
val myData = spark.read.format("delta").load(deltaPath)
// Register table with Hive metastore
myData.write.format("delta").saveAsTable("input_data")
// Add new column
spark.sql("ALTER TABLE input_data ADD COLUMNS (new_col string)")
// Save as DataFrame and overwrite data on disk
val sqlDF = spark.sql("SELECT * FROM input_data")
sqlDF.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(deltaPath)
您尝试过使用合并语句吗?
https://docs.databricks.com/spark/latest/spark-sql/language-manual/merge-into.html
先更改增量 table,然后再进行合并操作:
from pyspark.sql.functions import lit
spark.read.format("delta").load('/mnt/delta/cov')\
.withColumn("Recovered", lit(''))\
.write\
.format("delta")\
.mode("overwrite")\
.option("overwriteSchema", "true")\
.save('/mnt/delta/cov')