通过仅从两列中获取唯一值来对 pyspark 数据框的列进行分组

Group column of pyspark dataframe by taking only unique values from two columns

我想根据来自 pyspark 数据框的两列的唯一值对一列进行分组。数据框的输出应该是这样的,一旦某个值用于 groupby 并且如果它存在于另一列中那么它不应该重复。

    |------------------|-------------------|
    |   fruit          |     fruits        | 
    |------------------|-------------------|
    |    apple         |     banana        |
    |    banana        |     apple         |
    |    apple         |     mango         |
    |    orange        |     guava         |
    |    apple         |    pineapple      |
    |    mango         |    apple          |
    |   banana         |     mango         |
    |   banana         |    pineapple      |
    | -------------------------------------|

我尝试过使用单列进行分组,但需要修改或需要一些其他逻辑。

df9=final_main.groupBy('fruit').agg(collect_list('fruits').alias('values'))

我从上面的查询中得到以下输出;

       |------------------|--------------------------------|
       |   fruit          |     values                     | 
       |------------------|--------------------------------|
       |  apple           | ['banana','mango','pineapple'] |
       |  banana          | ['apple']                      |
       |  orange          | ['guava']                      |
       |  mango           | ['apple']                      |
       |------------------|--------------------------------|

但我想要以下输出;

       |------------------|--------------------------------|
       |   fruit          |     values                     | 
       |------------------|--------------------------------|
       |  apple           | ['banana','mango','pineapple'] |
       |  orange          | ['guava']                      |
       |------------------|--------------------------------|

这看起来像是连通分量问题。有几种方法可以做到这一点。

1. GraphFrames

您可以使用 GraphFrames 包。数据框的每一行都定义了一条边,您可以使用 df 作为边并使用所有不同水果的数据框作为顶点来创建图形。然后调用 connectedComponents 方法。然后您可以操纵输出以获得您想要的结果。

2。只是 Pyspark

第二种方法有点麻烦。为每一行创建一个 "hash",如

hashed_df = df.withColumn('hash', F.sort_array(F.array(F.col('fruit'), F.col('fruits'))))

删除该列的所有非不同行

distinct_df = hashed_df.dropDuplicates(['hash'])

再次拆分项目

revert_df = distinct_df.withColumn('fruit', F.col('hash')[0]) \
    .withColumn('fruits', F.col('hash')[1])

按第一列分组

grouped_df = revert_df.groupBy('fruit').agg(F.collect_list('fruits').alias('group'))

如果 Pyspark 抱怨,您可能需要 "stringify" 使用 F.concat_ws 您的哈希,但想法是一样的。