PySpark - 在不使用 UDF 或连接的情况下创建多个聚合映射列

PySpark - create multiple aggregative map columns without using UDF or join

我有一个看起来类似于此的巨大数据框:

+----+-------+-------+-----+
|name|level_A|level_B|hours|
+----+-------+-------+-----+
| Bob|     10|      3|    5|
| Bob|     10|      3|   15|
| Bob|     20|      3|   25|
| Sue|     30|      3|   35|
| Sue|     30|      7|   45|
+----+-------+-------+-----+

我想要的输出:

+----+--------------------+------------------+
|name|         map_level_A|       map_level_B|
+----+--------------------+------------------+
| Bob|{10 -> 20, 20 -> 25}|         {3 -> 45}|
| Sue|          {30 -> 80}|{7 -> 45, 3 -> 35}|
+----+--------------------+------------------+

意思是,按 name 分组,添加 2 个映射 level_Alevel_B 的 MapType 列到 hours.

的总和

我知道我可以使用 UDF 或连接操作获得该输出。

但是实际上,数据很大,而且不是2个地图列,而是几十个,所以join/UDF成本太高了。

有没有更有效的方法?

您可以考虑使用 Window 函数。您需要为每个被 namelevel_X 分区的 level_X 计算一个 windowspec 来计算 hours 的总和。然后按 name 分组并从结构数组创建映射:

from pyspark.sql import Window
import pyspark.sql.functions as F

df = spark.createDataFrame([("Bob", 10, 3, 5), ("Bob", 10, 3, 15), ("Bob", 20, 3, 25), 
                            ("Sue", 30, 3, 35),("Sue", 30, 7, 45), ], 
                           ["name", "level_A", "level_B", "hours"])

wla = Window.partitionBy("name", "level_A")
wlb = Window.partitionBy("name", "level_B")

result = df.withColumn("hours_A", F.sum("hours").over(wla)) \
    .withColumn("hours_B", F.sum("hours").over(wlb)) \
    .groupBy("name") \
    .agg(
        F.map_from_entries(
            F.collect_set(F.struct(F.col("level_A"), F.col("hours_A")))
        ).alias("map_level_A"),
        F.map_from_entries(
            F.collect_set(F.struct(F.col("level_B"), F.col("hours_B")))
        ).alias("map_level_B")
    )

result.show()

#+----+--------------------+------------------+
#|name|         map_level_A|       map_level_B|
#+----+--------------------+------------------+
#| Sue|          {30 -> 80}|{3 -> 35, 7 -> 45}|
#| Bob|{10 -> 20, 20 -> 25}|         {3 -> 45}|
#+----+--------------------+------------------+