通过提供 TABLE NAME 而不是 TABLE PATH 将 spark Dataframe 写入现有的 Delta Table
Write spark Dataframe to an exisitng Delta Table by providing TABLE NAME instead of TABLE PATH
我正在尝试将 spark 数据帧写入现有的增量 table。
我确实有多种情况可以将数据保存到不同的 table 中,如下所示。
情景 01:
我有一个现有的增量 table,我必须使用选项 mergeSchema
将数据帧写入该 table,因为模式可能会因每次加载而改变。
我通过提供 delta table path
用下面的命令做同样的事情
finalDF01.write.format("delta").option("mergeSchema", "true").mode("append") \
.partitionBy("part01","part02").save(finalDF01DestFolderPath)
只是想知道这是否可以通过提供现有的 delta TABLE NAME 而不是 delta PATH 来完成。
已通过如下更新数据写入命令解决此问题。
finalDF01.write.format("delta").option("mergeSchema", "true").mode("append") \
.partitionBy("part01","part02").saveAsTable(finalDF01DestTableName)
- 这是正确的方法吗?
情景 02:
如果记录已经存在,我必须更新现有的 table,如果不存在,则插入新记录。
为此,我目前正在做如下所示。
spark.sql("SET spark.databricks.delta.schema.autoMerge.enabled = true")
DeltaTable.forPath(DestFolderPath)
.as("t")
.merge(
finalDataFrame.as("s"),
"t.id = s.id AND t.name= s.name")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
我尝试使用以下脚本。
destMasterTable.as("t")
.merge(
vehMasterDf.as("s"),
"t.id = s.id")
.whenNotMatched().insertAll()
.execute()
但低于错误(即使使用 alias
而不是 as
)。
error: value as is not a member of String
destMasterTable.as("t")
- 这里我也使用 delta table 路径作为目的地,有什么办法可以让我们提供 delta TABLE NAME 而不是 TABLE 路径?
最好提供 TABLE NAME 而不是 TABLE PATH,以防万一稍后更改 table 路径不会影响代码。
我在 databricks 文档中没有看到任何地方提供 table 名称以及 mergeSchema
和 autoMerge
。
可以这样做吗?
要将现有数据用作 table 而不是路径,您需要从一开始就使用 saveAsTable
,或者使用 SQL 在 Hive 元存储中注册现有数据命令 CREATE TABLE USING,像这样(语法可能略有不同,具体取决于您是 运行 on Databricks 还是 OSS Spark,并且取决于 Spark 的版本):
CREATE TABLE IF NOT EXISTS my_table
USING delta
LOCATION 'path_to_existing_data'
之后,您可以使用saveAsTable
。
对于第二个问题 - 看起来 destMasterTable
只是一个 String
。要引用现有 table,您需要使用 DeltaTable
对象 (doc) 中的函数 forName
:
DeltaTable.forName(destMasterTable)
.as("t")
...
我正在尝试将 spark 数据帧写入现有的增量 table。
我确实有多种情况可以将数据保存到不同的 table 中,如下所示。
情景 01:
我有一个现有的增量 table,我必须使用选项 mergeSchema
将数据帧写入该 table,因为模式可能会因每次加载而改变。
我通过提供 delta table path
用下面的命令做同样的事情finalDF01.write.format("delta").option("mergeSchema", "true").mode("append") \
.partitionBy("part01","part02").save(finalDF01DestFolderPath)
只是想知道这是否可以通过提供现有的 delta TABLE NAME 而不是 delta PATH 来完成。
已通过如下更新数据写入命令解决此问题。
finalDF01.write.format("delta").option("mergeSchema", "true").mode("append") \
.partitionBy("part01","part02").saveAsTable(finalDF01DestTableName)
- 这是正确的方法吗?
情景 02:
如果记录已经存在,我必须更新现有的 table,如果不存在,则插入新记录。 为此,我目前正在做如下所示。
spark.sql("SET spark.databricks.delta.schema.autoMerge.enabled = true")
DeltaTable.forPath(DestFolderPath)
.as("t")
.merge(
finalDataFrame.as("s"),
"t.id = s.id AND t.name= s.name")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
我尝试使用以下脚本。
destMasterTable.as("t")
.merge(
vehMasterDf.as("s"),
"t.id = s.id")
.whenNotMatched().insertAll()
.execute()
但低于错误(即使使用 alias
而不是 as
)。
error: value as is not a member of String
destMasterTable.as("t")
- 这里我也使用 delta table 路径作为目的地,有什么办法可以让我们提供 delta TABLE NAME 而不是 TABLE 路径?
最好提供 TABLE NAME 而不是 TABLE PATH,以防万一稍后更改 table 路径不会影响代码。
我在 databricks 文档中没有看到任何地方提供 table 名称以及 mergeSchema
和 autoMerge
。
可以这样做吗?
要将现有数据用作 table 而不是路径,您需要从一开始就使用 saveAsTable
,或者使用 SQL 在 Hive 元存储中注册现有数据命令 CREATE TABLE USING,像这样(语法可能略有不同,具体取决于您是 运行 on Databricks 还是 OSS Spark,并且取决于 Spark 的版本):
CREATE TABLE IF NOT EXISTS my_table
USING delta
LOCATION 'path_to_existing_data'
之后,您可以使用saveAsTable
。
对于第二个问题 - 看起来 destMasterTable
只是一个 String
。要引用现有 table,您需要使用 DeltaTable
对象 (doc) 中的函数 forName
:
DeltaTable.forName(destMasterTable)
.as("t")
...