Pyspark:如何比较两个时间戳以显示数据框或 table 中最近更新的记录?
Pyspark: How to compare two Timestamps to show most recently updated records in dataframe or table?
我正在使用 Pyspark 和 Hive table 为每日增量加载过程编写一个脚本,该脚本已经初始加载了数据。每天早上,作业都会 运行 脚本针对 table。
我一直在尝试使用 PySpark 创建一个时间戳过滤器,该过滤器将比较两个时间戳 mod_date_ts
和 max(mod_date_ts)
以显示自上次加载以来添加的更新记录并保存结果到数据框或另一个数据框。
我试过以下语法:
dw_mod_ts = base_df.select('dw_mod_ts')
max_dw_load_ts = base_df.select(max_('dw_mod_ts'))
inc_df = inc_df.filter(dw_mod_ts >= max_dw_load_ts)
但我不断收到类型错误和语法错误,指出无法将数据帧与 streven though I've casted both variables and columns as
TimestampType 进行比较。
inc_df = inc_df.filter(inc_df("dw_mod_ts").cast(DataTypes.DateType) >= max_('dw_mod_ts').cast(DataTypes.DateType))
此外,我不断收到一条错误消息,指出 >=
运算符也不能在当前语法中使用。
我没有太多使用 Pyspark 的经验,所以非常感谢大家的帮助或建议。
假设比较是字符串形式。先构建max_dw_load_ts
变量,然后将其值传给filter
得到最终结果
max_dw_mod_ts = df.groupBy().agg(F.max('dw_mod_ts')).collect()[0][0]
df = df.filter(f'dw_mod_ts >= "{max_dw_mod_ts}"')
df.show()
我正在使用 Pyspark 和 Hive table 为每日增量加载过程编写一个脚本,该脚本已经初始加载了数据。每天早上,作业都会 运行 脚本针对 table。
我一直在尝试使用 PySpark 创建一个时间戳过滤器,该过滤器将比较两个时间戳 mod_date_ts
和 max(mod_date_ts)
以显示自上次加载以来添加的更新记录并保存结果到数据框或另一个数据框。
我试过以下语法:
dw_mod_ts = base_df.select('dw_mod_ts')
max_dw_load_ts = base_df.select(max_('dw_mod_ts'))
inc_df = inc_df.filter(dw_mod_ts >= max_dw_load_ts)
但我不断收到类型错误和语法错误,指出无法将数据帧与 streven though I've casted both variables and columns as
TimestampType 进行比较。
inc_df = inc_df.filter(inc_df("dw_mod_ts").cast(DataTypes.DateType) >= max_('dw_mod_ts').cast(DataTypes.DateType))
此外,我不断收到一条错误消息,指出 >=
运算符也不能在当前语法中使用。
我没有太多使用 Pyspark 的经验,所以非常感谢大家的帮助或建议。
假设比较是字符串形式。先构建max_dw_load_ts
变量,然后将其值传给filter
得到最终结果
max_dw_mod_ts = df.groupBy().agg(F.max('dw_mod_ts')).collect()[0][0]
df = df.filter(f'dw_mod_ts >= "{max_dw_mod_ts}"')
df.show()