通过 pyspark 使用 window 累积更新 json 列

Updating json column using window cumulative via pyspark

我一直在尝试通过 Pyspark、SparkSQL 和 Pandas 更新一系列 JSON blob,但没有成功。数据如下所示:

#+---+---------+------------------------------------------+
#|ID |Timestamp|Properties                                |
#+---+---------+------------------------------------------+
#|a  |7        |{"a1": 5, "a2": 8}                        |
#|b  |12       |{"b1": 36, "b2": "u", "b3": 17, "b8": "c"}|
#|a  |8        |{"a2": 4}                                 |
#|a  |10       |{"a3": "z", "a4": "t"}                    |
#|a  |5        |{"a1": 3, "a2": 12, "a4": "r"}            |
#|b  |20       |{"b2": "k", "b3": 9}                      |
#|b  |14       |{"b8": "y", "b3": 2}                      |
#+---+---------+------------------------------------------+

我想要一个查询,它将按 ID 字段对行进行分区并按 Timestamp 字段对其进行排序。在此之后,Properties 字段将在每个分区中累积合并以创建一个新列 New Props。所以输出将是这样的:

#+---+---------+------------------------------------------+------------------------------------------+------+
#|ID |Timestamp|Properties                                |New_Props                                 |rownum|
#+---+---------+------------------------------------------+------------------------------------------+------+
#|a  |5        |{"a1": 3, "a2": 12, "a4": "r"}            |{"a1": 3, "a2": 12, "a4": "r"}            |1     |
#|a  |7        |{"a1": 5, "a2": 8}                        |{"a1": 5, "a2": 8, "a4": "r"}             |2     |
#|a  |8        |{"a2": 4}                                 |{"a1": 5, "a2": 4, "a4": "r"}             |3     |
#|a  |10       |{"a3": "z", "a4": "t"}                    |{"a1": 5, "a2": 4, "a3": "z", "a4": "t"}  |4     |
#|b  |12       |{"b1": 36, "b2": "u", "b3": 17, "b8": "c"}|{"b1": 36, "b2": "u", "b3": 17, "b8": "c"}|1     |
#|b  |14       |{"b8": "y", "b3": 2}                      |{"b1": 36, "b2": "u", "b3": 2, "b8": "y"} |2     |
#|b  |20       |{"b2": "k", "b3": 9}                      |{"b1": 36, "b2": "k", "b3": 9, "b8": "y"} |3     |
#+---+---------+------------------------------------------+------+------------------------------------------+

公式:从rownum2开始,获取上一行(rownum1)的New Props列值,更新为Properties列的值当前行 (rownum 2).

我尝试使用 LAG 函数,但我无法使用我当前正在函数本身内计算的列。

为了创建 Next Props 列,我尝试了这个 CASE 语句,但它不起作用:

CASE
    WHEN rownum != 1 THEN concat(properties, LAG(next_props, 1) OVER (PARTITION BY contentid ORDER BY updateddatetime))
    ELSE next_props
END AS new_props

一段时间以来,我一直在尝试不同的事情,但我被卡住了。我可能可以使用 for 循环和 python dict.update() 函数来完成,但我担心效率。感谢您的帮助。

这是在数组和映射列上使用高阶函数的一种方法:

  1. 使用 lag 为每一行获取上一行 Properties 并将上一行和当前行 Properties 转换为地图类型
  2. 在 window 上使用 collect_list 函数,获取前一行的累积数组 Properties
  3. 将当前行 Properties 添加到结果数组中,并使用 aggregate 使用 map_concat 连接内部映射。从你的例子来看,更新操作似乎只是简单地添加新键,所以在连接之前,我们使用 map_filter function
  4. 过滤已经存在的键
  5. 使用 to_json 从聚合映射中获取 json 字符串并删除中间列
from pyspark.sql import functions as F, Window

w = Window.partitionBy("ID").orderBy("Timestamp")

df1 = df.withColumn("rownum", F.row_number().over(w)) \
    .withColumn("prev_prop_map", F.from_json(F.lag("Properties").over(w), "map<string,string>")) \
    .withColumn("current_prop_map", F.from_json("Properties", "map<string,string>")) \
    .withColumn("cumulative_prev_props", F.collect_list("prev_prop_map").over(w)) \
    .withColumn(
        "New_Props",
        F.to_json(F.aggregate(
            F.concat(F.array("current_prop_map"), F.reverse(F.col("cumulative_prev_props"))),
            F.expr("cast(map() as map<string,string>)"),
            lambda acc, x: F.map_concat(
                acc,
                F.map_filter(x, lambda k, _: ~F.array_contains(F.map_keys(acc), k))
            )
        ))
).drop("prev_prop_map", "current_prop_map", "cumulative_prev_props")


df1.show(truncate=False)
#+---+---------+------------------------------------------+------+---------------------------------------+
#|ID |Timestamp|Properties                                |rownum|New_Props                              |
#+---+---------+------------------------------------------+------+---------------------------------------+
#|a  |5        |{"a1": 3, "a2": 12, "a4": "r"}            |1     |{"a1":"3","a2":"12","a4":"r"}          |
#|a  |7        |{"a1": 5, "a2": 8}                        |2     |{"a1":"5","a2":"8","a4":"r"}           |
#|a  |8        |{"a2": 4}                                 |3     |{"a2":"4","a1":"5","a4":"r"}           |
#|a  |10       |{"a3": "z", "a4": "t"}                    |4     |{"a3":"z","a4":"t","a2":"4","a1":"5"}  |
#|b  |12       |{"b1": 36, "b2": "u", "b3": 17, "b8": "c"}|1     |{"b1":"36","b2":"u","b3":"17","b8":"c"}|
#|b  |14       |{"b8": "y", "b3": 2}                      |2     |{"b8":"y","b3":"2","b1":"36","b2":"u"} |
#|b  |20       |{"b2": "k", "b3": 9}                      |3     |{"b2":"k","b3":"9","b8":"y","b1":"36"} |
#+---+---------+------------------------------------------+------+---------------------------------------+

如果您更喜欢使用 SQL 查询,这里是等效的 SparkSQL:

WITH props AS (
    SELECT  *,
            row_number() over(partition by ID order by Timestamp) AS rownum,
            from_json(lag(Properties) over(partition by ID order by Timestamp), 'map<string,string>') AS prev_prop_map,
            from_json(Properties, 'map<string,string>') AS current_prop_map
    FROM    props_tb
),  cumulative_props AS (
    SELECT  *,
            collect_list(prev_prop_map) over(partition by ID order by Timestamp) AS cumulative_prev_props
    FROM    props 
)

SELECT  ID,
        Timestamp,
        Properties,
        aggregate(
            concat(array(current_prop_map), reverse(cumulative_prev_props)),
            cast(map() as map<string,string>),
            (acc, x) -> map_concat(acc, map_filter(x, (k,v) -> ! array_contains(map_keys(acc), k)))
        ) AS New_Props,
        rownum
FROM    cumulative_props