pyspark 变化数据捕获实现

pyspark change data capture implementation

我有一个基地 table,它保存着实际数据。下面是 table 结构

id name address age date
A1 {"fname": "Alex", "lname": "Bhatt"} {"lane": "Mac Street", "flat": ["24", "26", "27", "29"]} 56 20201128
A2 {"fname": "Bob", "lname": "Natarajan"} {"lane": "Royd Street", "flat": ["22", "23", "27"], "pin": "123514"} 53 20201123
A1 {"fname": "Alex", "lname": "Bhattacharya"} {"lane": "Mac Street", "flat": ["24", "26", "27", "29"]} 56 20201228
A2 {"fname": "Bob", "lname": "Natarajan"} {"lane": "Royd Street", "flat": ["22", "24", "27"], "pin": "123514"} 53 20201228

在上面的 table 中,A1 和 A2 的数据发生了变化。此更改的数据摘要由另一个 table 捕获并提供。下面提到 table 结构。

id changed_field date
A1 name.lname 20201228
A2 address.flat[1] 20201228

根据上面的 2 table,我必须准备最后的 table,其中将捕获更改数据的详细信息。以下是预期的 table.

id changed_field new_value newdate old_value olddate
A1 name.lname Bhattacharya 20201228 Bhatt 20201128
A2 address.flat[1] 24 20201228 23 20201123

我已经尝试使用 spark sql 函数 get_json_object() 但它不起作用。任何建议都会很有帮助

我认为您需要创建另一个 json 列才能使用 get_json_object...请参阅下面的回答。

import pyspark.sql.functions as F

result = df1.select(
    'id',
    'date',
    F.to_json(
        F.struct(
            F.from_json('name', 'fname string, lname string').alias('name'),
            F.from_json('address', 'lane string, flat array<string>, pin string').alias('address')
        )
    ).alias('jsoncol')
).join(
    df2.withColumnRenamed('date', 'date2'), 'id'
).withColumn(
    'new_value',
    F.expr("get_json_object(jsoncol, '$.' || changed_field)")
).groupBy('id', 'changed_field').agg(
    F.array_sort(
        F.collect_list(
            F.array('date', 'new_value')
        )
    ).alias('values')
).select(
    'id',
    'changed_field',
    F.col('values')[1][1].alias('new_value'),
    F.col('values')[1][0].alias('newdate'),
    F.col('values')[0][1].alias('old_value'),
    F.col('values')[0][0].alias('olddate')
)
result.show(truncate=False)
+---+---------------+------------+--------+---------+--------+
|id |changed_field  |new_value   |newdate |old_value|olddate |
+---+---------------+------------+--------+---------+--------+
|A1 |name.lname     |Bhattacharya|20201228|Bhatt    |20201128|
|A2 |address.flat[1]|24          |20201228|23       |20201123|
+---+---------------+------------+--------+---------+--------+