在 PySpark 中对具有数组的列进行分组和聚合
Group by and aggregate on a column with array in PySpark
我有以下 PySpark 数据框。 column_2 为复杂数据类型 array
Column_1 Column_2 Column_3
A [{Mat=7},{Phy=8}] ABC
A [{Mat=7},{Phy=8}] CDE
B [{Mat=6},{Phy=7}] ZZZ
我必须对第 1 列和第 2 列进行分组,并获得第 3 列的最小聚合。
问题是当我尝试按第 1 列和第 2 列分组时出现错误
cannot be used as grouping expression because the data type is not an orderable data type
有没有办法将此列包含在分组依据中或以某种方式对其进行聚合。 column_2 中的值对于 column_1
中的键值将始终相同
预期输出:
Column_1 Column_2 Column_3
A [{Mat=7},{Phy=8}] ABC
B [{Mat=6},{Phy=7}] ZZZ
是否可以收集聚合函数中所有值的列表并将其展平并删除重复项?
The values in column_2 will always be same for a key value in column_1
如果是这样,那么你可以只取组中的first
值。
测试数据帧:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[('A', 'ABC', 7, 8),
('A', 'CDE', 7, 8),
('B', 'ZZZ', 6, 7)],
['Column_1', 'Column_3', 'm', 'p'])
df = df.select(
'Column_1',
F.array(F.create_map(F.lit('Mat'), 'm'), F.create_map(F.lit('Phy'), 'p')).alias('Column_2'),
'Column_3'
)
df.show(truncate=False)
print(df.dtypes)
# +--------+------------------------+--------+
# |Column_1|Column_2 |Column_3|
# +--------+------------------------+--------+
# |A |[{Mat -> 7}, {Phy -> 8}]|ABC |
# |A |[{Mat -> 7}, {Phy -> 8}]|CDE |
# |B |[{Mat -> 6}, {Phy -> 7}]|ZZZ |
# +--------+------------------------+--------+
# [('Column_1', 'string'), ('Column_2', 'array<map<string,bigint>>'), ('Column_3', 'string')]
聚合:
df2 = df.groupBy('Column_1').agg(
F.first('Column_2').alias('Column_2'),
F.min('Column_3').alias('Column_3')
)
df2.show(truncate=False)
# +--------+------------------------+--------+
# |Column_1|Column_2 |Column_3|
# +--------+------------------------+--------+
# |A |[{Mat -> 7}, {Phy -> 8}]|ABC |
# |B |[{Mat -> 6}, {Phy -> 7}]|ZZZ |
# +--------+------------------------+--------+
我可能误解了你的问题。如果我这样做了,没有人会受益。
我以为你想要 select Column_2
中的最小值总和。因此,我稍微修改了数据框以确保组 A
具有多个值。见 df
df = spark.createDataFrame(
[('A', 'ABC', 7, 8),
('A', 'CDE', 3, 8),
('B', 'ZZZ', 6, 7)],
['Column_1', 'Column_3', 'm', 'p'])
df = df.select(
'Column_1',
F.array(F.create_map(F.lit('Mat'), 'm'), F.create_map(F.lit('Phy'), 'p')).alias('Column_2'),
'Column_3'
)
df.show(truncate=False)
df
+--------+------------------------+--------+
|Column_1|Column_2 |Column_3|
+--------+------------------------+--------+
|A |[{Mat -> 7}, {Phy -> 8}]|ABC |
|A |[{Mat -> 3}, {Phy -> 8}]|CDE |
|B |[{Mat -> 6}, {Phy -> 7}]|ZZZ |
+--------+------------------------+--------+
解决方案
如果我的假设是正确的
- 将
Column_2
中键值对的值提取到名为 filter
的列中
- 通过对它们求和来聚合它们。存出来进来
filter
- 按
Column_1
和 filter
排序
- 删除子集
Column_1
的重复项
下面的代码
new = df.withColumn("filter",F.expr("aggregate(transform(Column_2,x -> map_values(x)[0] ),cast(0 as bigint),(x,i)->x+i)")).orderBy('Column_1',desc('filter')).dropDuplicates(['Column_1']).drop('filter')
new.show()
+--------+------------------------+--------+
|Column_1|Column_2 |Column_3|
+--------+------------------------+--------+
|A |[{Mat -> 7}, {Phy -> 8}]|ABC |
|B |[{Mat -> 6}, {Phy -> 7}]|ZZZ |
+--------+------------------------+--------+
我有以下 PySpark 数据框。 column_2 为复杂数据类型 array
Column_1 Column_2 Column_3
A [{Mat=7},{Phy=8}] ABC
A [{Mat=7},{Phy=8}] CDE
B [{Mat=6},{Phy=7}] ZZZ
我必须对第 1 列和第 2 列进行分组,并获得第 3 列的最小聚合。
问题是当我尝试按第 1 列和第 2 列分组时出现错误
cannot be used as grouping expression because the data type is not an orderable data type
有没有办法将此列包含在分组依据中或以某种方式对其进行聚合。 column_2 中的值对于 column_1
中的键值将始终相同预期输出:
Column_1 Column_2 Column_3
A [{Mat=7},{Phy=8}] ABC
B [{Mat=6},{Phy=7}] ZZZ
是否可以收集聚合函数中所有值的列表并将其展平并删除重复项?
The values in column_2 will always be same for a key value in column_1
如果是这样,那么你可以只取组中的first
值。
测试数据帧:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[('A', 'ABC', 7, 8),
('A', 'CDE', 7, 8),
('B', 'ZZZ', 6, 7)],
['Column_1', 'Column_3', 'm', 'p'])
df = df.select(
'Column_1',
F.array(F.create_map(F.lit('Mat'), 'm'), F.create_map(F.lit('Phy'), 'p')).alias('Column_2'),
'Column_3'
)
df.show(truncate=False)
print(df.dtypes)
# +--------+------------------------+--------+
# |Column_1|Column_2 |Column_3|
# +--------+------------------------+--------+
# |A |[{Mat -> 7}, {Phy -> 8}]|ABC |
# |A |[{Mat -> 7}, {Phy -> 8}]|CDE |
# |B |[{Mat -> 6}, {Phy -> 7}]|ZZZ |
# +--------+------------------------+--------+
# [('Column_1', 'string'), ('Column_2', 'array<map<string,bigint>>'), ('Column_3', 'string')]
聚合:
df2 = df.groupBy('Column_1').agg(
F.first('Column_2').alias('Column_2'),
F.min('Column_3').alias('Column_3')
)
df2.show(truncate=False)
# +--------+------------------------+--------+
# |Column_1|Column_2 |Column_3|
# +--------+------------------------+--------+
# |A |[{Mat -> 7}, {Phy -> 8}]|ABC |
# |B |[{Mat -> 6}, {Phy -> 7}]|ZZZ |
# +--------+------------------------+--------+
我可能误解了你的问题。如果我这样做了,没有人会受益。
我以为你想要 select Column_2
中的最小值总和。因此,我稍微修改了数据框以确保组 A
具有多个值。见 df
df = spark.createDataFrame(
[('A', 'ABC', 7, 8),
('A', 'CDE', 3, 8),
('B', 'ZZZ', 6, 7)],
['Column_1', 'Column_3', 'm', 'p'])
df = df.select(
'Column_1',
F.array(F.create_map(F.lit('Mat'), 'm'), F.create_map(F.lit('Phy'), 'p')).alias('Column_2'),
'Column_3'
)
df.show(truncate=False)
df
+--------+------------------------+--------+
|Column_1|Column_2 |Column_3|
+--------+------------------------+--------+
|A |[{Mat -> 7}, {Phy -> 8}]|ABC |
|A |[{Mat -> 3}, {Phy -> 8}]|CDE |
|B |[{Mat -> 6}, {Phy -> 7}]|ZZZ |
+--------+------------------------+--------+
解决方案
如果我的假设是正确的
- 将
Column_2
中键值对的值提取到名为filter
的列中
- 通过对它们求和来聚合它们。存出来进来
filter
- 按
Column_1
和filter
排序
- 删除子集
Column_1
的重复项
下面的代码
new = df.withColumn("filter",F.expr("aggregate(transform(Column_2,x -> map_values(x)[0] ),cast(0 as bigint),(x,i)->x+i)")).orderBy('Column_1',desc('filter')).dropDuplicates(['Column_1']).drop('filter')
new.show()
+--------+------------------------+--------+
|Column_1|Column_2 |Column_3|
+--------+------------------------+--------+
|A |[{Mat -> 7}, {Phy -> 8}]|ABC |
|B |[{Mat -> 6}, {Phy -> 7}]|ZZZ |
+--------+------------------------+--------+