如何在 Pyspark table 上创建修改日期列?

How to create modified date column on Pyspark table?

我需要在 Pyspark 中使用 'modifiedDate' 列创建增量 table。当行更新时,modifiedDate 的值应更改为当前时间。我考虑过制作触发器,但找不到有关在 Pyspark 中使用 delta tables 使用它们的任何信息。我该怎么办?

增量表中没有触发器这样的东西。您正在使用代码进行所有修改,并且在执行 MERGE operation 时需要使用代码来添加此列。整体工作流程可能如下所示:

  • 您的数据没有 modifiedDate
  • 您添加此列并将其设置为当前时间戳
  • 您正在使用 MERGE 操作对数据执行 UPSERT(更新现有数据和插入新数据)。为此,您需要定义一个用于标识唯一记录的主键。

类似这样(未测试)

import pyspark.sql.functions as F
from delta.tables import *

# this is your input data
df = ...
# add a column to it
input = df.withColumn("modifiedDate", F.current_timestamp())

destination = DeltaTable.forPath(spark, '<path-to-data>')
destination.alias('dest') \
  .merge(
    input.alias('updates'),
    # use 'and' if you have multiple columns in primary key
    'dest.id = updates.id'  
  ) \
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll() \
  .execute()