通过 pyspark 使用 window 累积更新 json 列
Updating json column using window cumulative via pyspark
我一直在尝试通过 Pyspark、SparkSQL 和 Pandas 更新一系列 JSON blob,但没有成功。数据如下所示:
#+---+---------+------------------------------------------+
#|ID |Timestamp|Properties |
#+---+---------+------------------------------------------+
#|a |7 |{"a1": 5, "a2": 8} |
#|b |12 |{"b1": 36, "b2": "u", "b3": 17, "b8": "c"}|
#|a |8 |{"a2": 4} |
#|a |10 |{"a3": "z", "a4": "t"} |
#|a |5 |{"a1": 3, "a2": 12, "a4": "r"} |
#|b |20 |{"b2": "k", "b3": 9} |
#|b |14 |{"b8": "y", "b3": 2} |
#+---+---------+------------------------------------------+
我想要一个查询,它将按 ID
字段对行进行分区并按 Timestamp
字段对其进行排序。在此之后,Properties
字段将在每个分区中累积合并以创建一个新列 New Props
。所以输出将是这样的:
#+---+---------+------------------------------------------+------------------------------------------+------+
#|ID |Timestamp|Properties |New_Props |rownum|
#+---+---------+------------------------------------------+------------------------------------------+------+
#|a |5 |{"a1": 3, "a2": 12, "a4": "r"} |{"a1": 3, "a2": 12, "a4": "r"} |1 |
#|a |7 |{"a1": 5, "a2": 8} |{"a1": 5, "a2": 8, "a4": "r"} |2 |
#|a |8 |{"a2": 4} |{"a1": 5, "a2": 4, "a4": "r"} |3 |
#|a |10 |{"a3": "z", "a4": "t"} |{"a1": 5, "a2": 4, "a3": "z", "a4": "t"} |4 |
#|b |12 |{"b1": 36, "b2": "u", "b3": 17, "b8": "c"}|{"b1": 36, "b2": "u", "b3": 17, "b8": "c"}|1 |
#|b |14 |{"b8": "y", "b3": 2} |{"b1": 36, "b2": "u", "b3": 2, "b8": "y"} |2 |
#|b |20 |{"b2": "k", "b3": 9} |{"b1": 36, "b2": "k", "b3": 9, "b8": "y"} |3 |
#+---+---------+------------------------------------------+------+------------------------------------------+
公式:从rownum
2开始,获取上一行(rownum
1)的New Props
列值,更新为Properties
列的值当前行 (rownum
2).
我尝试使用 LAG
函数,但我无法使用我当前正在函数本身内计算的列。
为了创建 Next Props
列,我尝试了这个 CASE 语句,但它不起作用:
CASE
WHEN rownum != 1 THEN concat(properties, LAG(next_props, 1) OVER (PARTITION BY contentid ORDER BY updateddatetime))
ELSE next_props
END AS new_props
一段时间以来,我一直在尝试不同的事情,但我被卡住了。我可能可以使用 for 循环和 python dict.update()
函数来完成,但我担心效率。感谢您的帮助。
这是在数组和映射列上使用高阶函数的一种方法:
- 使用
lag
为每一行获取上一行 Properties
并将上一行和当前行 Properties
转换为地图类型
- 在 window 上使用
collect_list
函数,获取前一行的累积数组 Properties
- 将当前行
Properties
添加到结果数组中,并使用 aggregate
使用 map_concat
连接内部映射。从你的例子来看,更新操作似乎只是简单地添加新键,所以在连接之前,我们使用 map_filter
function 过滤已经存在的键
- 使用
to_json
从聚合映射中获取 json 字符串并删除中间列
from pyspark.sql import functions as F, Window
w = Window.partitionBy("ID").orderBy("Timestamp")
df1 = df.withColumn("rownum", F.row_number().over(w)) \
.withColumn("prev_prop_map", F.from_json(F.lag("Properties").over(w), "map<string,string>")) \
.withColumn("current_prop_map", F.from_json("Properties", "map<string,string>")) \
.withColumn("cumulative_prev_props", F.collect_list("prev_prop_map").over(w)) \
.withColumn(
"New_Props",
F.to_json(F.aggregate(
F.concat(F.array("current_prop_map"), F.reverse(F.col("cumulative_prev_props"))),
F.expr("cast(map() as map<string,string>)"),
lambda acc, x: F.map_concat(
acc,
F.map_filter(x, lambda k, _: ~F.array_contains(F.map_keys(acc), k))
)
))
).drop("prev_prop_map", "current_prop_map", "cumulative_prev_props")
df1.show(truncate=False)
#+---+---------+------------------------------------------+------+---------------------------------------+
#|ID |Timestamp|Properties |rownum|New_Props |
#+---+---------+------------------------------------------+------+---------------------------------------+
#|a |5 |{"a1": 3, "a2": 12, "a4": "r"} |1 |{"a1":"3","a2":"12","a4":"r"} |
#|a |7 |{"a1": 5, "a2": 8} |2 |{"a1":"5","a2":"8","a4":"r"} |
#|a |8 |{"a2": 4} |3 |{"a2":"4","a1":"5","a4":"r"} |
#|a |10 |{"a3": "z", "a4": "t"} |4 |{"a3":"z","a4":"t","a2":"4","a1":"5"} |
#|b |12 |{"b1": 36, "b2": "u", "b3": 17, "b8": "c"}|1 |{"b1":"36","b2":"u","b3":"17","b8":"c"}|
#|b |14 |{"b8": "y", "b3": 2} |2 |{"b8":"y","b3":"2","b1":"36","b2":"u"} |
#|b |20 |{"b2": "k", "b3": 9} |3 |{"b2":"k","b3":"9","b8":"y","b1":"36"} |
#+---+---------+------------------------------------------+------+---------------------------------------+
如果您更喜欢使用 SQL 查询,这里是等效的 SparkSQL:
WITH props AS (
SELECT *,
row_number() over(partition by ID order by Timestamp) AS rownum,
from_json(lag(Properties) over(partition by ID order by Timestamp), 'map<string,string>') AS prev_prop_map,
from_json(Properties, 'map<string,string>') AS current_prop_map
FROM props_tb
), cumulative_props AS (
SELECT *,
collect_list(prev_prop_map) over(partition by ID order by Timestamp) AS cumulative_prev_props
FROM props
)
SELECT ID,
Timestamp,
Properties,
aggregate(
concat(array(current_prop_map), reverse(cumulative_prev_props)),
cast(map() as map<string,string>),
(acc, x) -> map_concat(acc, map_filter(x, (k,v) -> ! array_contains(map_keys(acc), k)))
) AS New_Props,
rownum
FROM cumulative_props
我一直在尝试通过 Pyspark、SparkSQL 和 Pandas 更新一系列 JSON blob,但没有成功。数据如下所示:
#+---+---------+------------------------------------------+
#|ID |Timestamp|Properties |
#+---+---------+------------------------------------------+
#|a |7 |{"a1": 5, "a2": 8} |
#|b |12 |{"b1": 36, "b2": "u", "b3": 17, "b8": "c"}|
#|a |8 |{"a2": 4} |
#|a |10 |{"a3": "z", "a4": "t"} |
#|a |5 |{"a1": 3, "a2": 12, "a4": "r"} |
#|b |20 |{"b2": "k", "b3": 9} |
#|b |14 |{"b8": "y", "b3": 2} |
#+---+---------+------------------------------------------+
我想要一个查询,它将按 ID
字段对行进行分区并按 Timestamp
字段对其进行排序。在此之后,Properties
字段将在每个分区中累积合并以创建一个新列 New Props
。所以输出将是这样的:
#+---+---------+------------------------------------------+------------------------------------------+------+
#|ID |Timestamp|Properties |New_Props |rownum|
#+---+---------+------------------------------------------+------------------------------------------+------+
#|a |5 |{"a1": 3, "a2": 12, "a4": "r"} |{"a1": 3, "a2": 12, "a4": "r"} |1 |
#|a |7 |{"a1": 5, "a2": 8} |{"a1": 5, "a2": 8, "a4": "r"} |2 |
#|a |8 |{"a2": 4} |{"a1": 5, "a2": 4, "a4": "r"} |3 |
#|a |10 |{"a3": "z", "a4": "t"} |{"a1": 5, "a2": 4, "a3": "z", "a4": "t"} |4 |
#|b |12 |{"b1": 36, "b2": "u", "b3": 17, "b8": "c"}|{"b1": 36, "b2": "u", "b3": 17, "b8": "c"}|1 |
#|b |14 |{"b8": "y", "b3": 2} |{"b1": 36, "b2": "u", "b3": 2, "b8": "y"} |2 |
#|b |20 |{"b2": "k", "b3": 9} |{"b1": 36, "b2": "k", "b3": 9, "b8": "y"} |3 |
#+---+---------+------------------------------------------+------+------------------------------------------+
公式:从rownum
2开始,获取上一行(rownum
1)的New Props
列值,更新为Properties
列的值当前行 (rownum
2).
我尝试使用 LAG
函数,但我无法使用我当前正在函数本身内计算的列。
为了创建 Next Props
列,我尝试了这个 CASE 语句,但它不起作用:
CASE
WHEN rownum != 1 THEN concat(properties, LAG(next_props, 1) OVER (PARTITION BY contentid ORDER BY updateddatetime))
ELSE next_props
END AS new_props
一段时间以来,我一直在尝试不同的事情,但我被卡住了。我可能可以使用 for 循环和 python dict.update()
函数来完成,但我担心效率。感谢您的帮助。
这是在数组和映射列上使用高阶函数的一种方法:
- 使用
lag
为每一行获取上一行Properties
并将上一行和当前行Properties
转换为地图类型 - 在 window 上使用
collect_list
函数,获取前一行的累积数组Properties
- 将当前行
Properties
添加到结果数组中,并使用aggregate
使用map_concat
连接内部映射。从你的例子来看,更新操作似乎只是简单地添加新键,所以在连接之前,我们使用map_filter
function 过滤已经存在的键
- 使用
to_json
从聚合映射中获取 json 字符串并删除中间列
from pyspark.sql import functions as F, Window
w = Window.partitionBy("ID").orderBy("Timestamp")
df1 = df.withColumn("rownum", F.row_number().over(w)) \
.withColumn("prev_prop_map", F.from_json(F.lag("Properties").over(w), "map<string,string>")) \
.withColumn("current_prop_map", F.from_json("Properties", "map<string,string>")) \
.withColumn("cumulative_prev_props", F.collect_list("prev_prop_map").over(w)) \
.withColumn(
"New_Props",
F.to_json(F.aggregate(
F.concat(F.array("current_prop_map"), F.reverse(F.col("cumulative_prev_props"))),
F.expr("cast(map() as map<string,string>)"),
lambda acc, x: F.map_concat(
acc,
F.map_filter(x, lambda k, _: ~F.array_contains(F.map_keys(acc), k))
)
))
).drop("prev_prop_map", "current_prop_map", "cumulative_prev_props")
df1.show(truncate=False)
#+---+---------+------------------------------------------+------+---------------------------------------+
#|ID |Timestamp|Properties |rownum|New_Props |
#+---+---------+------------------------------------------+------+---------------------------------------+
#|a |5 |{"a1": 3, "a2": 12, "a4": "r"} |1 |{"a1":"3","a2":"12","a4":"r"} |
#|a |7 |{"a1": 5, "a2": 8} |2 |{"a1":"5","a2":"8","a4":"r"} |
#|a |8 |{"a2": 4} |3 |{"a2":"4","a1":"5","a4":"r"} |
#|a |10 |{"a3": "z", "a4": "t"} |4 |{"a3":"z","a4":"t","a2":"4","a1":"5"} |
#|b |12 |{"b1": 36, "b2": "u", "b3": 17, "b8": "c"}|1 |{"b1":"36","b2":"u","b3":"17","b8":"c"}|
#|b |14 |{"b8": "y", "b3": 2} |2 |{"b8":"y","b3":"2","b1":"36","b2":"u"} |
#|b |20 |{"b2": "k", "b3": 9} |3 |{"b2":"k","b3":"9","b8":"y","b1":"36"} |
#+---+---------+------------------------------------------+------+---------------------------------------+
如果您更喜欢使用 SQL 查询,这里是等效的 SparkSQL:
WITH props AS (
SELECT *,
row_number() over(partition by ID order by Timestamp) AS rownum,
from_json(lag(Properties) over(partition by ID order by Timestamp), 'map<string,string>') AS prev_prop_map,
from_json(Properties, 'map<string,string>') AS current_prop_map
FROM props_tb
), cumulative_props AS (
SELECT *,
collect_list(prev_prop_map) over(partition by ID order by Timestamp) AS cumulative_prev_props
FROM props
)
SELECT ID,
Timestamp,
Properties,
aggregate(
concat(array(current_prop_map), reverse(cumulative_prev_props)),
cast(map() as map<string,string>),
(acc, x) -> map_concat(acc, map_filter(x, (k,v) -> ! array_contains(map_keys(acc), k)))
) AS New_Props,
rownum
FROM cumulative_props