PySparks/Databricks 中两列的图形处理
Graph processing of two columns in PySparks / Databricks
假设我有以下包含两列的数据框
value_1| value_2
----------------
1| 2
2| 3
4| 5
6| 5
4| 6
现在我想将我所有的值聚类到一个新的数据框,其中列 ID 包含每个出现的值和列 cluster_ID表示以某种方式同时出现的所有值的最小值:
ID | cluster_ID
----------------
1| 1
2| 1
3| 1
4| 4
5| 4
6| 4
请注意,即使值 1 和 3 现在直接 link,它们仍然聚集在 (1, 2, 3) 簇中,因为它们都与值 2 有一个连接.
由于我不知道如何以 Sparks 的方式解决这个问题,我尝试了以下操作:
首先,我创建了一个包含所有 ID 对的列表列表:
[[1, 2], [2, 3], [4, 5], [6, 5], [4, 6]]
然后我创建了一个列表列表,其中每个子列表代表带有此 for 循环的集群:
id_pair_list = [[1, 2], [2, 3], [4, 5], [6, 5], [4, 6]]
duplicate_list = []
for e in id_pair_list:
if not duplicate_list:
duplicate_list = [e]
else:
try:
index = next(i for i, value in enumerate(duplicate_list) if e[0] in value)
updated_list = duplicate_list[index]
updated_list.append(e[1])
duplicate_list[index] = updated_list
except StopIteration:
pass
try:
index = next(i for i, value in enumerate(duplicate_list) if e[1] in value)
updated_list = duplicate_list[index]
updated_list.append(e[0])
duplicate_list[index] = updated_list
except StopIteration:
duplicate_list.append(e)
set_duplicate_list = []
for e in duplicate_list:
set_duplicate_list.append(sorted(list(set(e))))
结果看起来像这样,符合预期:
[[1, 2, 3], [4, 5, 6]]
在此之后,我创建了这样的新数据框:
id_mapping_df = spark.createDataFrame(
[[set_duplicate_list]],
['col']
).select(
F.explode('col').alias('ID')
).withColumn(
'cluster_id',
F.array_min('ID')
).withColumn(
'ID',
F.explode('ID')
)
这给了我最终的结果
...但是...
不幸的是,这只适用于我的小示例数据集。
当我用我更大的真实数据集尝试这个时,我突然遇到了问题,一些值出现在多个集群子列表中,这不应该是这种情况。
我想这已经发生了,因为 Sparks 的 for 循环是一种反模式,并且通过在我的 4 个节点上分配工作负载,Sparks 没有保持我的集群列表的一个恒定状态。
我怎样才能以更好的 Sparks 兼容方式解决这个问题?
THX & BR
进入数字
在我看来这不像是集群。如果您想使用 Spark 进行集群,您可以在下面的 link 中找到关于从哪里开始的一些想法。
这对我来说更像是一个图形问题,而不是聚类问题。在 Databricks 中,您可以通过将相关的 GraphFrames 库上传到集群来使用 GraphFrames。 connectedComponents 算法计算出组。我用的是graphframes-0.8.0-spark3.0-s_2.12.jar,这个要看你的Spark(3.x)和Scala版本(2.12.x)
这是一个简单的例子:
单元格 1
%python
from graphframes import *
# Vertices dataframe
v = sqlContext.createDataFrame((
( 1, 2 ), ( 2, 3 ), ( 4, 5 ),
( 6, 5 ), ( 4, 6 )
)).toDF("id", "id2")
## Edge dataframe
e = sqlContext.createDataFrame((
(1, 2, "is linked to"),
(2, 3, "is linked to"),
(4, 5, "is linked to"),
(6, 5, "is linked to"),
(4, 6, "is linked to")
)).toDF("src", "dst", "relationship")
## Create the graph frame
g = GraphFrame(v, e)
print(g)
单元格 2
%python
## The connected components adds a component id to each 'group'
sc.setCheckpointDir("/tmp/graphframes-example-connected-components")
components = g.connectedComponents() ## doesn't work on Spark 1.4
display(components)
此时,components
数据框将包含您需要的所有信息:
如果需要,您可以进一步操作它,例如,将其保存到临时视图并在其上 运行 一些常规 SQL:
小区 3
%python
components.createOrReplaceTempView("tmp")
单元格 4
%sql
SELECT id, component
FROM tmp
UNION
SELECT id2, component
FROM tmp
ORDER BY 1, 2
SQL 结果:
如果您的数据已经在数据框中,则只需 select 和 where 过滤器即可轻松从原始数据框生成边缘数据框,例如,请参见此处的 .
假设我有以下包含两列的数据框
value_1| value_2
----------------
1| 2
2| 3
4| 5
6| 5
4| 6
现在我想将我所有的值聚类到一个新的数据框,其中列 ID 包含每个出现的值和列 cluster_ID表示以某种方式同时出现的所有值的最小值:
ID | cluster_ID
----------------
1| 1
2| 1
3| 1
4| 4
5| 4
6| 4
请注意,即使值 1 和 3 现在直接 link,它们仍然聚集在 (1, 2, 3) 簇中,因为它们都与值 2 有一个连接.
由于我不知道如何以 Sparks 的方式解决这个问题,我尝试了以下操作:
首先,我创建了一个包含所有 ID 对的列表列表:
[[1, 2], [2, 3], [4, 5], [6, 5], [4, 6]]
然后我创建了一个列表列表,其中每个子列表代表带有此 for 循环的集群:
id_pair_list = [[1, 2], [2, 3], [4, 5], [6, 5], [4, 6]]
duplicate_list = []
for e in id_pair_list:
if not duplicate_list:
duplicate_list = [e]
else:
try:
index = next(i for i, value in enumerate(duplicate_list) if e[0] in value)
updated_list = duplicate_list[index]
updated_list.append(e[1])
duplicate_list[index] = updated_list
except StopIteration:
pass
try:
index = next(i for i, value in enumerate(duplicate_list) if e[1] in value)
updated_list = duplicate_list[index]
updated_list.append(e[0])
duplicate_list[index] = updated_list
except StopIteration:
duplicate_list.append(e)
set_duplicate_list = []
for e in duplicate_list:
set_duplicate_list.append(sorted(list(set(e))))
结果看起来像这样,符合预期:
[[1, 2, 3], [4, 5, 6]]
在此之后,我创建了这样的新数据框:
id_mapping_df = spark.createDataFrame(
[[set_duplicate_list]],
['col']
).select(
F.explode('col').alias('ID')
).withColumn(
'cluster_id',
F.array_min('ID')
).withColumn(
'ID',
F.explode('ID')
)
这给了我最终的结果
...但是...
不幸的是,这只适用于我的小示例数据集。 当我用我更大的真实数据集尝试这个时,我突然遇到了问题,一些值出现在多个集群子列表中,这不应该是这种情况。
我想这已经发生了,因为 Sparks 的 for 循环是一种反模式,并且通过在我的 4 个节点上分配工作负载,Sparks 没有保持我的集群列表的一个恒定状态。
我怎样才能以更好的 Sparks 兼容方式解决这个问题?
THX & BR 进入数字
在我看来这不像是集群。如果您想使用 Spark 进行集群,您可以在下面的 link 中找到关于从哪里开始的一些想法。
这对我来说更像是一个图形问题,而不是聚类问题。在 Databricks 中,您可以通过将相关的 GraphFrames 库上传到集群来使用 GraphFrames。 connectedComponents 算法计算出组。我用的是graphframes-0.8.0-spark3.0-s_2.12.jar,这个要看你的Spark(3.x)和Scala版本(2.12.x)
这是一个简单的例子:
单元格 1
%python
from graphframes import *
# Vertices dataframe
v = sqlContext.createDataFrame((
( 1, 2 ), ( 2, 3 ), ( 4, 5 ),
( 6, 5 ), ( 4, 6 )
)).toDF("id", "id2")
## Edge dataframe
e = sqlContext.createDataFrame((
(1, 2, "is linked to"),
(2, 3, "is linked to"),
(4, 5, "is linked to"),
(6, 5, "is linked to"),
(4, 6, "is linked to")
)).toDF("src", "dst", "relationship")
## Create the graph frame
g = GraphFrame(v, e)
print(g)
单元格 2
%python
## The connected components adds a component id to each 'group'
sc.setCheckpointDir("/tmp/graphframes-example-connected-components")
components = g.connectedComponents() ## doesn't work on Spark 1.4
display(components)
此时,components
数据框将包含您需要的所有信息:
如果需要,您可以进一步操作它,例如,将其保存到临时视图并在其上 运行 一些常规 SQL:
小区 3
%python
components.createOrReplaceTempView("tmp")
单元格 4
%sql
SELECT id, component
FROM tmp
UNION
SELECT id2, component
FROM tmp
ORDER BY 1, 2
SQL 结果:
如果您的数据已经在数据框中,则只需 select 和 where 过滤器即可轻松从原始数据框生成边缘数据框,例如,请参见此处的