使用 MapReduce 高效地将边列表转换为邻接列表

Efficiently convert edge list to adjacency list using MapReduce

我有一个表示树的有向边列表。 'u v' 表示你是 v.

的 child
sc = SparkContext(conf = conf)
lines = sc.textFile("/content/sample_data/data.txt")
lines.take(10)
['0 0', '1 0', '2 0', '3 1', '4 1', '5 2', '6 2', '7 3', '8 3', '9 4']

我把上面的转换成下面的形式存储为intermediate:

[(0, ('out', 0)),
 (0, ('in', 0)),
 (1, ('out', 0)),
 (0, ('in', 1)),...]

我正在尝试根据上面的形式构建邻接表:

[(8721, [('out', 4360), ('in', 17443), ('in', 17444)]),
 (9291, [('out', 4645), ('in', 18583), ('in', 18584)]),
 (9345, [('out', 4672), ('in', 18691), ('in', 18692)]),..]

在这里,第一行告诉我们 8721 是 4360 的 child 而 [17443, 17444] 是 8721children

我正在使用 Spark 模块公开的 groupByKeyreduceByKey 方法。

intermediate.groupByKey().mapValues(list)

上一行花费了很多时间。在具有 12 GB RAM 的 8 核机器上,100 MB 的测试数据需要将近 250 秒。我最终必须在分布式环境中为 >15GB 的数据部署它。

我了解 groupByKey 会导致跨所有节点的数据混洗。在我的情况下有什么办法可以避免吗? 对如何优化此操作的任何建议表示赞赏。

在对数据集的行进行分组时,您无法避免随机播放。但是,您可以使用数据框 API 而不是 RDD。 Dataframe API 比 RDD 更高效,参见

如果你的txt文件格式如下:

0 0
0 1
...

然后你就可以把它当作数据帧来读了。

df = spark.read.csv('test.txt', sep=' ')
df.show()

+---+---+
|_c0|_c1|
+---+---+
|  0|  0|
|  1|  0|
|  2|  0|
|  3|  1|
|  4|  1|
|  5|  2|
|  6|  2|
|  7|  3|
|  8|  3|
|  9|  4|
+---+---+

通过附加类型列进行交叉连接或联合:

df2 = df.withColumn('_c2', f.lit('in')).unionAll(df.select('_c1', '_c0').withColumn('_c2', f.lit('out')))
df2.show()

+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
|  0|  0| in|
|  1|  0| in|
|  2|  0| in|
|  3|  1| in|
|  4|  1| in|
|  5|  2| in|
|  6|  2| in|
|  7|  3| in|
|  8|  3| in|
|  9|  4| in|
|  0|  0|out|
|  0|  1|out|
|  0|  2|out|
|  1|  3|out|
|  1|  4|out|
|  2|  5|out|
|  2|  6|out|
|  3|  7|out|
|  3|  8|out|
|  4|  9|out|
+---+---+---+

groupBy结果。

df3 = df2.groupBy('_c0').agg(f.collect_list(f.array('_c2','_c1'))).toDF('edge', 'list')
df3.show(truncate=False)
df3.printSchema()

+----+---------------------------------------+
|edge|list                                   |
+----+---------------------------------------+
|7   |[[in, 3]]                              |
|3   |[[in, 1], [out, 7], [out, 8]]          |
|8   |[[in, 3]]                              |
|0   |[[in, 0], [out, 0], [out, 1], [out, 2]]|
|5   |[[in, 2]]                              |
|6   |[[in, 2]]                              |
|9   |[[in, 4]]                              |
|1   |[[in, 0], [out, 3], [out, 4]]          |
|4   |[[in, 1], [out, 9]]                    |
|2   |[[in, 0], [out, 5], [out, 6]]          |
+----+---------------------------------------+

root
 |-- edge: string (nullable = true)
 |-- list: array (nullable = false)
 |    |-- element: array (containsNull = false)
 |    |    |-- element: string (containsNull = true)