在 PySpark 中对具有数组的列进行分组和聚合

Group by and aggregate on a column with array in PySpark

我有以下 PySpark 数据框。 column_2 为复杂数据类型 array>

Column_1 Column_2                 Column_3
A        [{Mat=7},{Phy=8}]        ABC
A        [{Mat=7},{Phy=8}]        CDE
B        [{Mat=6},{Phy=7}]        ZZZ

我必须对第 1 列和第 2 列进行分组,并获得第 3 列的最小聚合。

问题是当我尝试按第 1 列和第 2 列分组时出现错误

cannot be used as grouping expression because the data type is not an orderable data type

有没有办法将此列包含在分组依据中或以某种方式对其进行聚合。 column_2 中的值对于 column_1

中的键值将始终相同

预期输出:

Column_1 Column_2                Column_3
A        [{Mat=7},{Phy=8}]       ABC
B        [{Mat=6},{Phy=7}]       ZZZ

是否可以收集聚合函数中所有值的列表并将其展平并删除重复项?

The values in column_2 will always be same for a key value in column_1

如果是这样,那么你可以只取组中的first值。

测试数据帧:

from pyspark.sql import functions as F

df = spark.createDataFrame(
    [('A', 'ABC', 7, 8),
     ('A', 'CDE', 7, 8),
     ('B', 'ZZZ', 6, 7)],
    ['Column_1', 'Column_3', 'm', 'p'])
df = df.select(
    'Column_1',
    F.array(F.create_map(F.lit('Mat'), 'm'), F.create_map(F.lit('Phy'), 'p')).alias('Column_2'),
    'Column_3'
)
df.show(truncate=False)
print(df.dtypes)
# +--------+------------------------+--------+
# |Column_1|Column_2                |Column_3|
# +--------+------------------------+--------+
# |A       |[{Mat -> 7}, {Phy -> 8}]|ABC     |
# |A       |[{Mat -> 7}, {Phy -> 8}]|CDE     |
# |B       |[{Mat -> 6}, {Phy -> 7}]|ZZZ     |
# +--------+------------------------+--------+

# [('Column_1', 'string'), ('Column_2', 'array<map<string,bigint>>'), ('Column_3', 'string')]

聚合:

df2 = df.groupBy('Column_1').agg(
    F.first('Column_2').alias('Column_2'),
    F.min('Column_3').alias('Column_3')
)
df2.show(truncate=False)
# +--------+------------------------+--------+
# |Column_1|Column_2                |Column_3|
# +--------+------------------------+--------+
# |A       |[{Mat -> 7}, {Phy -> 8}]|ABC     |
# |B       |[{Mat -> 6}, {Phy -> 7}]|ZZZ     |
# +--------+------------------------+--------+

我可能误解了你的问题。如果我这样做了,没有人会受益。

我以为你想要 select Column_2 中的最小值总和。因此,我稍微修改了数据框以确保组 A 具有多个值。见 df

df = spark.createDataFrame(
    [('A', 'ABC', 7, 8),
     ('A', 'CDE', 3, 8),
     ('B', 'ZZZ', 6, 7)],
    ['Column_1', 'Column_3', 'm', 'p'])
df = df.select(
    'Column_1',
    F.array(F.create_map(F.lit('Mat'), 'm'), F.create_map(F.lit('Phy'), 'p')).alias('Column_2'),
    'Column_3'
)
df.show(truncate=False)  

df

+--------+------------------------+--------+
|Column_1|Column_2                |Column_3|
+--------+------------------------+--------+
|A       |[{Mat -> 7}, {Phy -> 8}]|ABC     |
|A       |[{Mat -> 3}, {Phy -> 8}]|CDE     |
|B       |[{Mat -> 6}, {Phy -> 7}]|ZZZ     |
+--------+------------------------+--------+

解决方案

如果我的假设是正确的

  1. Column_2 中键值对的值提取到名为 filter
  2. 的列中
  3. 通过对它们求和来聚合它们。存出来进来filter
  4. Column_1filter
  5. 排序
  6. 删除子集 Column_1
  7. 的重复项

下面的代码

new = df.withColumn("filter",F.expr("aggregate(transform(Column_2,x -> map_values(x)[0] ),cast(0 as bigint),(x,i)->x+i)")).orderBy('Column_1',desc('filter')).dropDuplicates(['Column_1']).drop('filter')
new.show()
    
+--------+------------------------+--------+
|Column_1|Column_2                |Column_3|
+--------+------------------------+--------+
|A       |[{Mat -> 7}, {Phy -> 8}]|ABC     |
|B       |[{Mat -> 6}, {Phy -> 7}]|ZZZ     |
+--------+------------------------+--------+