如何从 Pyspark 中的 spark 数据框创建边缘列表?
How to create edge list from spark data frame in Pyspark?
我在 pyspark 中使用 graphframes
进行某些图形类型的分析,想知道从顶点数据框创建边列表数据框的最佳方法是什么。
例如,下面是我的顶点数据框。我有一个 ID 列表,它们属于不同的组。
+---+-----+
|id |group|
+---+-----+
|a |1 |
|b |2 |
|c |1 |
|d |2 |
|e |3 |
|a |3 |
|f |1 |
+---+-----+
我的objective是创建一个边缘列表数据框来指示出现在公共组中的id。 请注意,1 个 id 可能出现在多个组中(例如,上面的 id a 在第 1 组和第 3 组中)。 下面是我想要获取的边缘列表数据框:
+---+-----+-----+
|src|dst |group|
+---+-----+-----+
|a |c |1 |
|a |f |1 |
|c |f |1 |
|b |d |2 |
|a |e |3 |
+---+-----+-----+
提前致谢!
编辑 1
不确定这是否是更好的解决方法,但我做了一个解决方法:
import pyspark.sql.functions as f
df = df.withColumn('match', f.collect_set('id').over(Window.partitionBy('group')))
df = df.select(f.col('id').alias('src'),
f.explode('match').alias('dst'),
f.col('group'))
df = df.withColumn('duplicate_edges', f.array_sort(f.array('src', 'dst')))
df = (df
.where(f.col('src') != f.col('dst'))
.drop_duplicates(subset=['duplicate_edges'])
.drop('duplicate_edges'))
df.sort('group', 'src', 'dst').show()
输出
+---+---+-----+
|src|dst|group|
+---+---+-----+
| a| c| 1|
| a| f| 1|
| c| f| 1|
| b| d| 2|
| e| a| 3|
+---+---+-----+
原回答
试试这个:
import pyspark.sql.functions as f
df = (df
.groupby('group')
.agg(f.first('id').alias('src'),
f.last('id').alias('dst')))
df.show()
输出:
+-----+---+---+
|group|src|dst|
+-----+---+---+
| 1| a| c|
| 3| e| a|
| 2| b| d|
+-----+---+---+
您可以进行自我加入:
df = df.toDF('src', 'group')
df2 = df.toDF('dst', 'group2')
result = df.join(
df2,
(df.group == df2.group2) & (df.src < df2.dst)
).select('src', 'dst', 'group').distinct().orderBy('group', 'src', 'dst')
result.show()
+---+---+-----+
|src|dst|group|
+---+---+-----+
| a| c| 1|
| a| f| 1|
| c| f| 1|
| b| d| 2|
| a| e| 3|
+---+---+-----+
我在 pyspark 中使用 graphframes
进行某些图形类型的分析,想知道从顶点数据框创建边列表数据框的最佳方法是什么。
例如,下面是我的顶点数据框。我有一个 ID 列表,它们属于不同的组。
+---+-----+
|id |group|
+---+-----+
|a |1 |
|b |2 |
|c |1 |
|d |2 |
|e |3 |
|a |3 |
|f |1 |
+---+-----+
我的objective是创建一个边缘列表数据框来指示出现在公共组中的id。 请注意,1 个 id 可能出现在多个组中(例如,上面的 id a 在第 1 组和第 3 组中)。 下面是我想要获取的边缘列表数据框:
+---+-----+-----+
|src|dst |group|
+---+-----+-----+
|a |c |1 |
|a |f |1 |
|c |f |1 |
|b |d |2 |
|a |e |3 |
+---+-----+-----+
提前致谢!
编辑 1
不确定这是否是更好的解决方法,但我做了一个解决方法:
import pyspark.sql.functions as f
df = df.withColumn('match', f.collect_set('id').over(Window.partitionBy('group')))
df = df.select(f.col('id').alias('src'),
f.explode('match').alias('dst'),
f.col('group'))
df = df.withColumn('duplicate_edges', f.array_sort(f.array('src', 'dst')))
df = (df
.where(f.col('src') != f.col('dst'))
.drop_duplicates(subset=['duplicate_edges'])
.drop('duplicate_edges'))
df.sort('group', 'src', 'dst').show()
输出
+---+---+-----+
|src|dst|group|
+---+---+-----+
| a| c| 1|
| a| f| 1|
| c| f| 1|
| b| d| 2|
| e| a| 3|
+---+---+-----+
原回答
试试这个:
import pyspark.sql.functions as f
df = (df
.groupby('group')
.agg(f.first('id').alias('src'),
f.last('id').alias('dst')))
df.show()
输出:
+-----+---+---+
|group|src|dst|
+-----+---+---+
| 1| a| c|
| 3| e| a|
| 2| b| d|
+-----+---+---+
您可以进行自我加入:
df = df.toDF('src', 'group')
df2 = df.toDF('dst', 'group2')
result = df.join(
df2,
(df.group == df2.group2) & (df.src < df2.dst)
).select('src', 'dst', 'group').distinct().orderBy('group', 'src', 'dst')
result.show()
+---+---+-----+
|src|dst|group|
+---+---+-----+
| a| c| 1|
| a| f| 1|
| c| f| 1|
| b| d| 2|
| a| e| 3|
+---+---+-----+