PySparks/Databricks 中两列的图形处理

Graph processing of two columns in PySparks / Databricks

假设我有以下包含两列的数据框

value_1| value_2
----------------
      1|       2
      2|       3
      4|       5
      6|       5
      4|       6

现在我想将我所有的值聚类到一个新的数据框,其中列 ID 包含每个出现的值和列 cluster_ID表示以某种方式同时出现的所有值的最小值:

ID  | cluster_ID
----------------
   1|          1
   2|          1
   3|          1
   4|          4
   5|          4
   6|          4

请注意,即使值 1 和 3 现在直接 link,它们仍然聚集在 (1, 2, 3) 簇中,因为它们都与值 2 有一个连接.

由于我不知道如何以 Sparks 的方式解决这个问题,我尝试了以下操作:

首先,我创建了一个包含所有 ID 对的列表列表:

[[1, 2], [2, 3], [4, 5], [6, 5], [4, 6]]

然后我创建了一个列表列表,其中每个子列表代表带有此 for 循环的集群:

id_pair_list = [[1, 2], [2, 3], [4, 5], [6, 5], [4, 6]]

duplicate_list = []

for e in id_pair_list:
  if not duplicate_list:
    duplicate_list = [e]
  else:
    try:
      index = next(i for i, value in enumerate(duplicate_list) if e[0] in value)
      updated_list = duplicate_list[index] 
      updated_list.append(e[1])       
      duplicate_list[index] = updated_list
    except StopIteration:
      pass
      try:
        index = next(i for i, value in enumerate(duplicate_list) if e[1] in value)
        updated_list = duplicate_list[index]
        updated_list.append(e[0]) 
        duplicate_list[index] = updated_list
      except StopIteration:
        duplicate_list.append(e)   
        
set_duplicate_list = []
for e in duplicate_list:
  set_duplicate_list.append(sorted(list(set(e))))

结果看起来像这样,符合预期:

[[1, 2, 3], [4, 5, 6]]

在此之后,我创建了这样的新数据框:

id_mapping_df = spark.createDataFrame(
    [[set_duplicate_list]], 
    ['col']
).select(
    F.explode('col').alias('ID')
).withColumn(
    'cluster_id', 
    F.array_min('ID')
).withColumn(
    'ID', 
    F.explode('ID')
)

这给了我最终的结果

...但是...

不幸的是,这只适用于我的小示例数据集。 当我用我更大的真实数据集尝试这个时,我突然遇到了问题,一些值出现在多个集群子列表中,这不应该是这种情况。

我想这已经发生了,因为 Sparks 的 for 循环是一种反模式,并且通过在我的 4 个节点上分配工作负载,Sparks 没有保持我的集群列表的一个恒定状态。

我怎样才能以更好的 Sparks 兼容方式解决这个问题?

THX & BR 进入数字

在我看来这不像是集群。如果您想使用 Spark 进行集群,您可以在下面的 link 中找到关于从哪里开始的一些想法。

https://spark.apache.org/docs/latest/ml-clustering.html

这对我来说更像是一个图形问题,而不是聚类问题。在 Databricks 中,您可以通过将相关的 GraphFrames 库上传到集群来使用 GraphFrames。 connectedComponents 算法计算出组。我用的是graphframes-0.8.0-spark3.0-s_2.12.jar,这个要看你的Spark(3.x)和Scala版本(2.12.x)

这是一个简单的例子:

单元格 1

%python
from graphframes import *

# Vertices dataframe
v = sqlContext.createDataFrame((
  ( 1, 2 ), ( 2, 3 ), ( 4, 5 ),
  ( 6, 5 ), ( 4, 6 )
)).toDF("id", "id2")

## Edge dataframe
e = sqlContext.createDataFrame((
  (1, 2, "is linked to"),
  (2, 3, "is linked to"),
  (4, 5, "is linked to"),
  (6, 5, "is linked to"),
  (4, 6, "is linked to") 
)).toDF("src", "dst", "relationship")


## Create the graph frame
g = GraphFrame(v, e)
print(g)

单元格 2

%python
## The connected components adds a component id to each 'group'
sc.setCheckpointDir("/tmp/graphframes-example-connected-components")

components = g.connectedComponents() ## doesn't work on Spark 1.4
display(components)

此时,components 数据框将包含您需要的所有信息:

如果需要,您可以进一步操作它,例如,将其保存到临时视图并在其上 运行 一些常规 SQL:

小区 3

%python
components.createOrReplaceTempView("tmp")

单元格 4

%sql
SELECT id, component
FROM tmp
UNION
SELECT id2, component
FROM tmp
ORDER BY 1, 2

SQL 结果:

如果您的数据已经在数据框中,则只需 select 和 where 过滤器即可轻松从原始数据框生成边缘数据框,例如,请参见此处的 .