Spark - 聚合 children 和 parent 条记录
Spark - aggregate and sum children with parent records
我处理具有树结构的数据。每个parent可以有多个children。 Parent 没有关于 children 的信息,但是每个 child 都知道它的 parent。此外,每个 child 都知道它的完整路径 - 这是一串连接的 parent 标识符,因此每个记录都知道它在树中的级别。该记录的结构为:
id | parent_id | path
--- + --------- + ------
11 | 1 | 1-11
12 | 1 | 1-12
121 | 12 | 1-12-121
现在我必须读取 table,按 id 分组并对 bigint 类型的列 value
求和。最重要的事实是只有叶子 - 没有 children 的元素 - 具有指定值,每个 parent 必须是其所有 children 值的总和。最初所有 parent 的值都等于 0。
分组前:
Root
| - Parent 1 (value = 0)
| - - Child 11 (value = 1)
| - - Child 12 (value = 1)
| - - Parent 13 (value = 0)
| - - - Child 131 (value = 2)
| - - - Child 132 (value = 1)
| - Parent 2 (value = 0)
| - - Child 21 (value = 2)
| - - Child 22 (value = 1)
分组结果:
Root
| - Parent 1 (value = 5 (1 + 1 + 3))
| - - Child 11 (value = 1)
| - - Child 12 (value = 1)
| - - Parent 13 (value = 3 (2 + 1))
| - - - Child 131 (value = 2)
| - - - Child 132 (value = 1)
| - Parent 2 (value = 3 (2 + 1))
| - - Child 21 (value = 2)
| - - Child 22 (value = 1)
而且非常重要的要求:我无法收集这些数据并在内存中分组,因为数据集真的很大,所以我必须使用数据集或数据框来完成。
如果我没理解错的话,您只对每个节点的值之和感兴趣。在这种情况下,您只需查看每次路径之一中出现的节点,并为相应节点添加所有此类值。火花版本将是:
scala> val df = spark.sql(s"""
select
col1 as id,
col2 as parent_id,
col3 as path,
col4 as value
from values
(11, 1, "1-11", 1),
(12, 1, "1-12", 1),
(13, 1, "1-13", 0),
(131, 13, "1-13-131", 2),
(132, 13, "1-13-132", 1)
""")
scala> (df
.withColumn("path_arr", split(col("path"), "-"))
.select($"value", explode($"path_arr").as("node"))
.groupBy("node")
.sum()
.orderBy($"node")
).show
产生:
+----+----------+
|node|sum(value)|
+----+----------+
| 1| 5|
| 11| 1|
| 12| 1|
| 13| 3|
| 131| 2|
| 132| 1|
+----+----------+
我处理具有树结构的数据。每个parent可以有多个children。 Parent 没有关于 children 的信息,但是每个 child 都知道它的 parent。此外,每个 child 都知道它的完整路径 - 这是一串连接的 parent 标识符,因此每个记录都知道它在树中的级别。该记录的结构为:
id | parent_id | path
--- + --------- + ------
11 | 1 | 1-11
12 | 1 | 1-12
121 | 12 | 1-12-121
现在我必须读取 table,按 id 分组并对 bigint 类型的列 value
求和。最重要的事实是只有叶子 - 没有 children 的元素 - 具有指定值,每个 parent 必须是其所有 children 值的总和。最初所有 parent 的值都等于 0。
分组前:
Root
| - Parent 1 (value = 0)
| - - Child 11 (value = 1)
| - - Child 12 (value = 1)
| - - Parent 13 (value = 0)
| - - - Child 131 (value = 2)
| - - - Child 132 (value = 1)
| - Parent 2 (value = 0)
| - - Child 21 (value = 2)
| - - Child 22 (value = 1)
分组结果:
Root
| - Parent 1 (value = 5 (1 + 1 + 3))
| - - Child 11 (value = 1)
| - - Child 12 (value = 1)
| - - Parent 13 (value = 3 (2 + 1))
| - - - Child 131 (value = 2)
| - - - Child 132 (value = 1)
| - Parent 2 (value = 3 (2 + 1))
| - - Child 21 (value = 2)
| - - Child 22 (value = 1)
而且非常重要的要求:我无法收集这些数据并在内存中分组,因为数据集真的很大,所以我必须使用数据集或数据框来完成。
如果我没理解错的话,您只对每个节点的值之和感兴趣。在这种情况下,您只需查看每次路径之一中出现的节点,并为相应节点添加所有此类值。火花版本将是:
scala> val df = spark.sql(s"""
select
col1 as id,
col2 as parent_id,
col3 as path,
col4 as value
from values
(11, 1, "1-11", 1),
(12, 1, "1-12", 1),
(13, 1, "1-13", 0),
(131, 13, "1-13-131", 2),
(132, 13, "1-13-132", 1)
""")
scala> (df
.withColumn("path_arr", split(col("path"), "-"))
.select($"value", explode($"path_arr").as("node"))
.groupBy("node")
.sum()
.orderBy($"node")
).show
产生:
+----+----------+
|node|sum(value)|
+----+----------+
| 1| 5|
| 11| 1|
| 12| 1|
| 13| 3|
| 131| 2|
| 132| 1|
+----+----------+