如何在 Pyspark table 上创建修改日期列?
How to create modified date column on Pyspark table?
我需要在 Pyspark 中使用 'modifiedDate' 列创建增量 table。当行更新时,modifiedDate 的值应更改为当前时间。我考虑过制作触发器,但找不到有关在 Pyspark 中使用 delta tables 使用它们的任何信息。我该怎么办?
增量表中没有触发器这样的东西。您正在使用代码进行所有修改,并且在执行 MERGE operation 时需要使用代码来添加此列。整体工作流程可能如下所示:
- 您的数据没有
modifiedDate
列
- 您添加此列并将其设置为当前时间戳
- 您正在使用 MERGE 操作对数据执行 UPSERT(更新现有数据和插入新数据)。为此,您需要定义一个用于标识唯一记录的主键。
类似这样(未测试)
import pyspark.sql.functions as F
from delta.tables import *
# this is your input data
df = ...
# add a column to it
input = df.withColumn("modifiedDate", F.current_timestamp())
destination = DeltaTable.forPath(spark, '<path-to-data>')
destination.alias('dest') \
.merge(
input.alias('updates'),
# use 'and' if you have multiple columns in primary key
'dest.id = updates.id'
) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
我需要在 Pyspark 中使用 'modifiedDate' 列创建增量 table。当行更新时,modifiedDate 的值应更改为当前时间。我考虑过制作触发器,但找不到有关在 Pyspark 中使用 delta tables 使用它们的任何信息。我该怎么办?
增量表中没有触发器这样的东西。您正在使用代码进行所有修改,并且在执行 MERGE operation 时需要使用代码来添加此列。整体工作流程可能如下所示:
- 您的数据没有
modifiedDate
列 - 您添加此列并将其设置为当前时间戳
- 您正在使用 MERGE 操作对数据执行 UPSERT(更新现有数据和插入新数据)。为此,您需要定义一个用于标识唯一记录的主键。
类似这样(未测试)
import pyspark.sql.functions as F
from delta.tables import *
# this is your input data
df = ...
# add a column to it
input = df.withColumn("modifiedDate", F.current_timestamp())
destination = DeltaTable.forPath(spark, '<path-to-data>')
destination.alias('dest') \
.merge(
input.alias('updates'),
# use 'and' if you have multiple columns in primary key
'dest.id = updates.id'
) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()