Apache-Spark Graph-frame 在 BFS 上非常慢
Apache-Spark Graph-frame is very slow on BFS
我在以下代码中使用 Scala 的 Apache Spark-GraphFrames,我在上面的代码上应用 BFS 并尝试找到顶点 0 到 100 之间的距离。
import org.apache.spark._
import org.graphframes._
import org.graphframes.GraphFrame
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SQLContext
object SimpApp{
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("SimpApp")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val nodesList = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("CSV File Path")
val edgesList= sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("CSV File Path")
val v=nodesList.toDF("id")
val e=edgesList.toDF("src", "dst", "dist")
val g = GraphFrame(v, e)
var paths: DataFrame = g.bfs.fromExpr("id = 0").toExpr(s"id = 100").maxPathLength(101).run()
paths.show()
sc.stop()
}
}
源Node:0目的地Node:100
顶点列表如下
id
0
1
2
3
.
.
.
up to
1000
这是边列表
src dst dist
0 1 2
1, 2, 1
2, 3, 5
3, 4, 1
4, 5, 3
5, 6, 3
6, 7, 6
. . .
. . .
. . .
up to
999, 998, 4
但是上面给出的代码的问题是,仅执行 0 到 100 个顶点就需要花费大量时间,因为它是 运行 4 小时但没有输出。
上面的代码我 运行 在具有 12 GB RAM 的单机上。
能否请您指导我加快和优化代码。
为了验证,我认为您正在尝试为图形的未加权边缘找到最短距离,因此使用了 BFS。在这种情况下,您可能希望从查询中删除 maxPathLength(101)
,以便其:
g.bfs.fromExpr("id = 0").toExpr("id = 100").run()
如 BFS definition 中所述:
maxPathLength
is the limit on the length of paths with a default of
10. If no valid paths of length <= maxPathLength are found, then the BFS is terminated.
通过在顶点 0 和顶点 100 之间指定 101,它将尝试找到从 0 到 100 的所有边,这些边的长度为 101,因此需要大量迭代。
BFS 和最短距离的一个有趣示例可以在有关航班的经典图形场景中描述(参考:On-Time Flight Performance with GraphFrames for Apache Spark),其中顶点(或节点)是机场,而边是机场之间的航班机场。
如果您要查找 SFO
(旧金山)和 BUF
(布法罗)之间的直飞航班,BFS 查询将是:
tripGraph.bfs.fromExpr("id = 'SFO'").toExpr("id = 'BUF').maxPathLength(1).run
正如所引用的 link 中所述,没有直飞航班,因此没有结果。但是,如果您将 maxPathLength
增加到 2(即 SFO
和 BUF
节点之间的一个额外节点),那么您会发现许多路径(例如 SFO
> BOS
> BUF
或旧金山到波士顿到布法罗)
tripGraph.bfs.fromExpr("id = 'SFO'").toExpr("id = 'BUF').maxPathLength(2).run
我在以下代码中使用 Scala 的 Apache Spark-GraphFrames,我在上面的代码上应用 BFS 并尝试找到顶点 0 到 100 之间的距离。
import org.apache.spark._
import org.graphframes._
import org.graphframes.GraphFrame
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SQLContext
object SimpApp{
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("SimpApp")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val nodesList = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("CSV File Path")
val edgesList= sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("CSV File Path")
val v=nodesList.toDF("id")
val e=edgesList.toDF("src", "dst", "dist")
val g = GraphFrame(v, e)
var paths: DataFrame = g.bfs.fromExpr("id = 0").toExpr(s"id = 100").maxPathLength(101).run()
paths.show()
sc.stop()
}
}
源Node:0目的地Node:100
顶点列表如下
id
0
1
2
3
.
.
.
up to
1000
这是边列表
src dst dist
0 1 2
1, 2, 1
2, 3, 5
3, 4, 1
4, 5, 3
5, 6, 3
6, 7, 6
. . .
. . .
. . .
up to
999, 998, 4
但是上面给出的代码的问题是,仅执行 0 到 100 个顶点就需要花费大量时间,因为它是 运行 4 小时但没有输出。 上面的代码我 运行 在具有 12 GB RAM 的单机上。
能否请您指导我加快和优化代码。
为了验证,我认为您正在尝试为图形的未加权边缘找到最短距离,因此使用了 BFS。在这种情况下,您可能希望从查询中删除 maxPathLength(101)
,以便其:
g.bfs.fromExpr("id = 0").toExpr("id = 100").run()
如 BFS definition 中所述:
maxPathLength
is the limit on the length of paths with a default of 10. If no valid paths of length <= maxPathLength are found, then the BFS is terminated.
通过在顶点 0 和顶点 100 之间指定 101,它将尝试找到从 0 到 100 的所有边,这些边的长度为 101,因此需要大量迭代。
BFS 和最短距离的一个有趣示例可以在有关航班的经典图形场景中描述(参考:On-Time Flight Performance with GraphFrames for Apache Spark),其中顶点(或节点)是机场,而边是机场之间的航班机场。
如果您要查找 SFO
(旧金山)和 BUF
(布法罗)之间的直飞航班,BFS 查询将是:
tripGraph.bfs.fromExpr("id = 'SFO'").toExpr("id = 'BUF').maxPathLength(1).run
正如所引用的 link 中所述,没有直飞航班,因此没有结果。但是,如果您将 maxPathLength
增加到 2(即 SFO
和 BUF
节点之间的一个额外节点),那么您会发现许多路径(例如 SFO
> BOS
> BUF
或旧金山到波士顿到布法罗)
tripGraph.bfs.fromExpr("id = 'SFO'").toExpr("id = 'BUF').maxPathLength(2).run