将 PySpark 数据帧转换为 Delta Table
Converting PySpark dataframe to a Delta Table
我在 AWS Glue 环境中工作。我从 Glue 目录中读取数据作为动态数据框,并将其转换为 Pyspark 数据框以进行自定义转换。
要插入 new/updated 数据,我打算使用 delta tables.
但我只找到从路径中以增量 table 形式读取数据的选项。我需要将我的 Pyspark 数据帧转换为 Delta table 以进行合并操作。
有什么办法吗?
您只需要一个目的地 table 作为 Delta table。您计划合并到的数据不需要是 Delta table。这实际上取决于 API 您使用的是什么:
- 如果您正在使用 Python API,那么您可以按原样使用数据框(示例基于 docs):
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, "/data/events/")
updatesDF = .... # your transformed dataframe
deltaTable.alias("target").merge(
updatesDF.alias("updates"),
"target.col1 = updates.col1") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
- 如果您正在使用 SQL MERGE 命令 - 您只需为数据框注册临时视图,并将其用作 MERGE SQL 命令的输入:
updates_df.createOrReplaceTempView(updates)
merge_sql = f"""
merge into target
using updates
ON source.col1 == target.col1
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
"""
updates_df._jdf.sparkSession().sql(merge_sql)
这里唯一的问题是您需要使用 df._jdf.sparkSession().sql
在注册临时视图的同一上下文中执行 SQL 命令。
我在 AWS Glue 环境中工作。我从 Glue 目录中读取数据作为动态数据框,并将其转换为 Pyspark 数据框以进行自定义转换。 要插入 new/updated 数据,我打算使用 delta tables.
但我只找到从路径中以增量 table 形式读取数据的选项。我需要将我的 Pyspark 数据帧转换为 Delta table 以进行合并操作。 有什么办法吗?
您只需要一个目的地 table 作为 Delta table。您计划合并到的数据不需要是 Delta table。这实际上取决于 API 您使用的是什么:
- 如果您正在使用 Python API,那么您可以按原样使用数据框(示例基于 docs):
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, "/data/events/")
updatesDF = .... # your transformed dataframe
deltaTable.alias("target").merge(
updatesDF.alias("updates"),
"target.col1 = updates.col1") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
- 如果您正在使用 SQL MERGE 命令 - 您只需为数据框注册临时视图,并将其用作 MERGE SQL 命令的输入:
updates_df.createOrReplaceTempView(updates)
merge_sql = f"""
merge into target
using updates
ON source.col1 == target.col1
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
"""
updates_df._jdf.sparkSession().sql(merge_sql)
这里唯一的问题是您需要使用 df._jdf.sparkSession().sql
在注册临时视图的同一上下文中执行 SQL 命令。