将相关值归为一组

group the related values in one group

尝试根据相关记录对列值进行分组

partColumns = (["partnumber","colVal1","colVal2", "colVal3","colVal4","colVal5"])

partrelations = ([("part0","part1","", "","",""),
                  ("part1","","part2", "","part4",""),
                  ("part2","part3", "", "part5","part6","part7"),
                  ("part10","part11","", "","",""),
                  ("part11","part13","part21", "","",""),
                  ("part13","part21","part18", "","part20",""),
                 ])
df_part_groups = spark.createDataFrame(data=partrelations, schema = partColumns) 

尝试获得如下输出 -


edges = (df_part_groups
         .withColumnRenamed("partnumber", "src")
         .withColumnRenamed("colVal1", "dst")
        )

vertices = (edges.select("src").distinct()
            .union(edges.select("dst").distinct())
            .withColumnRenamed("src", "id"))
         
#create a graph and find all connected components
g = G.GraphFrame(vertices, edges)
cc = g.connectedComponents()

display(df_part_groups
        .join(cc.distinct(), df_part_groups.device == cc.id)
        .orderBy("component", "partnumber", "colVal1"))

以上是我要整理的内容

感谢帮助!!

我们可以使用集合交集做一个简单的检查来解决问题。 (不知道 GraphFrames :()

第 1 步:将每一行的所有部分合并到一个数组中

from pyspark.sql import functions as F
    
df_part_groups1= df_part_groups.withColumn('parts', F.array('partnumber', 'colVal1', 'colVal2', 'colVal3', 'colVal4', 'colVal5')  )

第 2 步:获取 all_parts 这是组合部件列表的列表,因为需要在各个行中确定组。

def clean_lists(plists):
  return [ list(filter(None, pl)) for pl in plists]

all_parts = clean_lists((df_part_groups1.groupBy(F.lit(1)).agg(F.collect_list('parts').alias('parts')).collect())[0].parts)

第 3 步:使用收集的 all_parts

获取群组数据
def part_of_existing_group(gps, pl):
  for key in gps.keys():
    if set(gps[key]) & set(pl):
      gps[key] = list(set(gps[key] + pl))      
      return True
      return False   
      
def findGroups(plists): 
  groups = {}    
  index = 1
  for pl in plists:
    if len(groups.keys()) == 0 or (not part_of_existing_group(groups, pl)):
      groups[f'G{index}'] = pl
      index +=1
  return groups  

第 4 步:根据您创建的组映射分配组。

 groups = findGroups(all_parts)
    
    @udf
def get_group_val(part):
  for key in groups.keys():
    if part in groups[key]:
      return key
  return -1

df_part_groups2 = df_part_groups1.withColumn('part', F.explode('parts')).dropDuplicates(['part']).where(~F.col('part').like('')).select('part', 'parts').withColumn('Group', get_group_val('part'))

    df_part_groups2.show()
+------+--------------------+-----+
|  part|               parts|Group|
+------+--------------------+-----+
| part0|[part0, part1, , ...|   G1|
| part1|[part0, part1, , ...|   G1|
|part10|[part10, part11, ...|   G2|
|part11|[part10, part11, ...|   G2|
|part13|[part11, part13, ...|   G2|
|part18|[part13, part21, ...|   G2|
| part2|[part1, , part2, ...|   G1|
|part20|[part13, part21, ...|   G2|
|part21|[part11, part13, ...|   G2|
| part3|[part2, part3, , ...|   G1|
| part4|[part1, , part2, ...|   G1|
| part5|[part2, part3, , ...|   G1|
| part6|[part2, part3, , ...|   G1|
| part7|[part2, part3, , ...|   G1|
+------+--------------------+-----+