使用 MapReduce 高效地将边列表转换为邻接列表
Efficiently convert edge list to adjacency list using MapReduce
我有一个表示树的有向边列表。 'u v' 表示你是 v.
的 child
sc = SparkContext(conf = conf)
lines = sc.textFile("/content/sample_data/data.txt")
lines.take(10)
['0 0', '1 0', '2 0', '3 1', '4 1', '5 2', '6 2', '7 3', '8 3', '9 4']
我把上面的转换成下面的形式存储为intermediate
:
[(0, ('out', 0)),
(0, ('in', 0)),
(1, ('out', 0)),
(0, ('in', 1)),...]
我正在尝试根据上面的形式构建邻接表:
[(8721, [('out', 4360), ('in', 17443), ('in', 17444)]),
(9291, [('out', 4645), ('in', 18583), ('in', 18584)]),
(9345, [('out', 4672), ('in', 18691), ('in', 18692)]),..]
在这里,第一行告诉我们 8721 是 4360 的 child 而 [17443, 17444] 是 8721children
我正在使用 Spark 模块公开的 groupByKey
或 reduceByKey
方法。
intermediate.groupByKey().mapValues(list)
上一行花费了很多时间。在具有 12 GB RAM 的 8 核机器上,100 MB 的测试数据需要将近 250 秒。我最终必须在分布式环境中为 >15GB 的数据部署它。
我了解 groupByKey 会导致跨所有节点的数据混洗。在我的情况下有什么办法可以避免吗?
对如何优化此操作的任何建议表示赞赏。
在对数据集的行进行分组时,您无法避免随机播放。但是,您可以使用数据框 API 而不是 RDD。 Dataframe API 比 RDD 更高效,参见
如果你的txt文件格式如下:
0 0
0 1
...
然后你就可以把它当作数据帧来读了。
df = spark.read.csv('test.txt', sep=' ')
df.show()
+---+---+
|_c0|_c1|
+---+---+
| 0| 0|
| 1| 0|
| 2| 0|
| 3| 1|
| 4| 1|
| 5| 2|
| 6| 2|
| 7| 3|
| 8| 3|
| 9| 4|
+---+---+
通过附加类型列进行交叉连接或联合:
df2 = df.withColumn('_c2', f.lit('in')).unionAll(df.select('_c1', '_c0').withColumn('_c2', f.lit('out')))
df2.show()
+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
| 0| 0| in|
| 1| 0| in|
| 2| 0| in|
| 3| 1| in|
| 4| 1| in|
| 5| 2| in|
| 6| 2| in|
| 7| 3| in|
| 8| 3| in|
| 9| 4| in|
| 0| 0|out|
| 0| 1|out|
| 0| 2|out|
| 1| 3|out|
| 1| 4|out|
| 2| 5|out|
| 2| 6|out|
| 3| 7|out|
| 3| 8|out|
| 4| 9|out|
+---+---+---+
和groupBy
结果。
df3 = df2.groupBy('_c0').agg(f.collect_list(f.array('_c2','_c1'))).toDF('edge', 'list')
df3.show(truncate=False)
df3.printSchema()
+----+---------------------------------------+
|edge|list |
+----+---------------------------------------+
|7 |[[in, 3]] |
|3 |[[in, 1], [out, 7], [out, 8]] |
|8 |[[in, 3]] |
|0 |[[in, 0], [out, 0], [out, 1], [out, 2]]|
|5 |[[in, 2]] |
|6 |[[in, 2]] |
|9 |[[in, 4]] |
|1 |[[in, 0], [out, 3], [out, 4]] |
|4 |[[in, 1], [out, 9]] |
|2 |[[in, 0], [out, 5], [out, 6]] |
+----+---------------------------------------+
root
|-- edge: string (nullable = true)
|-- list: array (nullable = false)
| |-- element: array (containsNull = false)
| | |-- element: string (containsNull = true)
我有一个表示树的有向边列表。 'u v' 表示你是 v.
的 childsc = SparkContext(conf = conf)
lines = sc.textFile("/content/sample_data/data.txt")
lines.take(10)
['0 0', '1 0', '2 0', '3 1', '4 1', '5 2', '6 2', '7 3', '8 3', '9 4']
我把上面的转换成下面的形式存储为intermediate
:
[(0, ('out', 0)),
(0, ('in', 0)),
(1, ('out', 0)),
(0, ('in', 1)),...]
我正在尝试根据上面的形式构建邻接表:
[(8721, [('out', 4360), ('in', 17443), ('in', 17444)]),
(9291, [('out', 4645), ('in', 18583), ('in', 18584)]),
(9345, [('out', 4672), ('in', 18691), ('in', 18692)]),..]
在这里,第一行告诉我们 8721 是 4360 的 child 而 [17443, 17444] 是 8721children
我正在使用 Spark 模块公开的 groupByKey
或 reduceByKey
方法。
intermediate.groupByKey().mapValues(list)
上一行花费了很多时间。在具有 12 GB RAM 的 8 核机器上,100 MB 的测试数据需要将近 250 秒。我最终必须在分布式环境中为 >15GB 的数据部署它。
我了解 groupByKey 会导致跨所有节点的数据混洗。在我的情况下有什么办法可以避免吗? 对如何优化此操作的任何建议表示赞赏。
在对数据集的行进行分组时,您无法避免随机播放。但是,您可以使用数据框 API 而不是 RDD。 Dataframe API 比 RDD 更高效,参见
如果你的txt文件格式如下:
0 0
0 1
...
然后你就可以把它当作数据帧来读了。
df = spark.read.csv('test.txt', sep=' ')
df.show()
+---+---+
|_c0|_c1|
+---+---+
| 0| 0|
| 1| 0|
| 2| 0|
| 3| 1|
| 4| 1|
| 5| 2|
| 6| 2|
| 7| 3|
| 8| 3|
| 9| 4|
+---+---+
通过附加类型列进行交叉连接或联合:
df2 = df.withColumn('_c2', f.lit('in')).unionAll(df.select('_c1', '_c0').withColumn('_c2', f.lit('out')))
df2.show()
+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
| 0| 0| in|
| 1| 0| in|
| 2| 0| in|
| 3| 1| in|
| 4| 1| in|
| 5| 2| in|
| 6| 2| in|
| 7| 3| in|
| 8| 3| in|
| 9| 4| in|
| 0| 0|out|
| 0| 1|out|
| 0| 2|out|
| 1| 3|out|
| 1| 4|out|
| 2| 5|out|
| 2| 6|out|
| 3| 7|out|
| 3| 8|out|
| 4| 9|out|
+---+---+---+
和groupBy
结果。
df3 = df2.groupBy('_c0').agg(f.collect_list(f.array('_c2','_c1'))).toDF('edge', 'list')
df3.show(truncate=False)
df3.printSchema()
+----+---------------------------------------+
|edge|list |
+----+---------------------------------------+
|7 |[[in, 3]] |
|3 |[[in, 1], [out, 7], [out, 8]] |
|8 |[[in, 3]] |
|0 |[[in, 0], [out, 0], [out, 1], [out, 2]]|
|5 |[[in, 2]] |
|6 |[[in, 2]] |
|9 |[[in, 4]] |
|1 |[[in, 0], [out, 3], [out, 4]] |
|4 |[[in, 1], [out, 9]] |
|2 |[[in, 0], [out, 5], [out, 6]] |
+----+---------------------------------------+
root
|-- edge: string (nullable = true)
|-- list: array (nullable = false)
| |-- element: array (containsNull = false)
| | |-- element: string (containsNull = true)