pyspark 变化数据捕获实现
pyspark change data capture implementation
我有一个基地 table,它保存着实际数据。下面是 table 结构
id
name
address
age
date
A1
{"fname": "Alex", "lname": "Bhatt"}
{"lane": "Mac Street", "flat": ["24", "26", "27", "29"]}
56
20201128
A2
{"fname": "Bob", "lname": "Natarajan"}
{"lane": "Royd Street", "flat": ["22", "23", "27"], "pin": "123514"}
53
20201123
A1
{"fname": "Alex", "lname": "Bhattacharya"}
{"lane": "Mac Street", "flat": ["24", "26", "27", "29"]}
56
20201228
A2
{"fname": "Bob", "lname": "Natarajan"}
{"lane": "Royd Street", "flat": ["22", "24", "27"], "pin": "123514"}
53
20201228
在上面的 table 中,A1 和 A2 的数据发生了变化。此更改的数据摘要由另一个 table 捕获并提供。下面提到 table 结构。
id
changed_field
date
A1
name.lname
20201228
A2
address.flat[1]
20201228
根据上面的 2 table,我必须准备最后的 table,其中将捕获更改数据的详细信息。以下是预期的 table.
id
changed_field
new_value
newdate
old_value
olddate
A1
name.lname
Bhattacharya
20201228
Bhatt
20201128
A2
address.flat[1]
24
20201228
23
20201123
我已经尝试使用 spark sql 函数 get_json_object() 但它不起作用。任何建议都会很有帮助
我认为您需要创建另一个 json 列才能使用 get_json_object
...请参阅下面的回答。
import pyspark.sql.functions as F
result = df1.select(
'id',
'date',
F.to_json(
F.struct(
F.from_json('name', 'fname string, lname string').alias('name'),
F.from_json('address', 'lane string, flat array<string>, pin string').alias('address')
)
).alias('jsoncol')
).join(
df2.withColumnRenamed('date', 'date2'), 'id'
).withColumn(
'new_value',
F.expr("get_json_object(jsoncol, '$.' || changed_field)")
).groupBy('id', 'changed_field').agg(
F.array_sort(
F.collect_list(
F.array('date', 'new_value')
)
).alias('values')
).select(
'id',
'changed_field',
F.col('values')[1][1].alias('new_value'),
F.col('values')[1][0].alias('newdate'),
F.col('values')[0][1].alias('old_value'),
F.col('values')[0][0].alias('olddate')
)
result.show(truncate=False)
+---+---------------+------------+--------+---------+--------+
|id |changed_field |new_value |newdate |old_value|olddate |
+---+---------------+------------+--------+---------+--------+
|A1 |name.lname |Bhattacharya|20201228|Bhatt |20201128|
|A2 |address.flat[1]|24 |20201228|23 |20201123|
+---+---------------+------------+--------+---------+--------+
我有一个基地 table,它保存着实际数据。下面是 table 结构
id | name | address | age | date |
---|---|---|---|---|
A1 | {"fname": "Alex", "lname": "Bhatt"} | {"lane": "Mac Street", "flat": ["24", "26", "27", "29"]} | 56 | 20201128 |
A2 | {"fname": "Bob", "lname": "Natarajan"} | {"lane": "Royd Street", "flat": ["22", "23", "27"], "pin": "123514"} | 53 | 20201123 |
A1 | {"fname": "Alex", "lname": "Bhattacharya"} | {"lane": "Mac Street", "flat": ["24", "26", "27", "29"]} | 56 | 20201228 |
A2 | {"fname": "Bob", "lname": "Natarajan"} | {"lane": "Royd Street", "flat": ["22", "24", "27"], "pin": "123514"} | 53 | 20201228 |
在上面的 table 中,A1 和 A2 的数据发生了变化。此更改的数据摘要由另一个 table 捕获并提供。下面提到 table 结构。
id | changed_field | date |
---|---|---|
A1 | name.lname | 20201228 |
A2 | address.flat[1] | 20201228 |
根据上面的 2 table,我必须准备最后的 table,其中将捕获更改数据的详细信息。以下是预期的 table.
id | changed_field | new_value | newdate | old_value | olddate |
---|---|---|---|---|---|
A1 | name.lname | Bhattacharya | 20201228 | Bhatt | 20201128 |
A2 | address.flat[1] | 24 | 20201228 | 23 | 20201123 |
我已经尝试使用 spark sql 函数 get_json_object() 但它不起作用。任何建议都会很有帮助
我认为您需要创建另一个 json 列才能使用 get_json_object
...请参阅下面的回答。
import pyspark.sql.functions as F
result = df1.select(
'id',
'date',
F.to_json(
F.struct(
F.from_json('name', 'fname string, lname string').alias('name'),
F.from_json('address', 'lane string, flat array<string>, pin string').alias('address')
)
).alias('jsoncol')
).join(
df2.withColumnRenamed('date', 'date2'), 'id'
).withColumn(
'new_value',
F.expr("get_json_object(jsoncol, '$.' || changed_field)")
).groupBy('id', 'changed_field').agg(
F.array_sort(
F.collect_list(
F.array('date', 'new_value')
)
).alias('values')
).select(
'id',
'changed_field',
F.col('values')[1][1].alias('new_value'),
F.col('values')[1][0].alias('newdate'),
F.col('values')[0][1].alias('old_value'),
F.col('values')[0][0].alias('olddate')
)
result.show(truncate=False)
+---+---------------+------------+--------+---------+--------+
|id |changed_field |new_value |newdate |old_value|olddate |
+---+---------------+------------+--------+---------+--------+
|A1 |name.lname |Bhattacharya|20201228|Bhatt |20201128|
|A2 |address.flat[1]|24 |20201228|23 |20201123|
+---+---------------+------------+--------+---------+--------+