Spark,Graphx 程序不使用 cpu 和内存
Spark,Graphx program does not utilize cpu and memory
我有一个获取节点邻居的函数,对于我使用广播变量和节点本身的 id 的邻居,它计算 node.I 映射图中每个节点的接近中心性结果 function.When 我打开任务管理器 cpu 根本没有被使用,就好像它没有并行工作一样,内存也是如此,但是每个节点都并行执行该功能而且数据很大,需要时间才能完成,它不像不需要 resources.Every 的帮助,非常感谢,谢谢。
为了加载图表,我使用 val graph = GraphLoader.edgeListFile(sc, path).cache
object ClosenessCentrality {
case class Vertex(id: VertexId)
def run(graph: Graph[Int, Float],sc: SparkContext): Unit = {
//Have to reverse edges and make graph undirected because is bipartite
val neighbors = CollectNeighbors.collectWeightedNeighbors(graph).collectAsMap()
val bNeighbors = sc.broadcast(neighbors)
val result = graph.vertices.map(f => shortestPaths(f._1,bNeighbors.value))
//result.coalesce(1)
result.count()
}
def shortestPaths(source: VertexId, neighbors: Map[VertexId, Map[VertexId, Float]]): Double ={
val predecessors = new mutable.HashMap[VertexId, ListBuffer[VertexId]]()
val distances = new mutable.HashMap[VertexId, Double]()
val q = new FibonacciHeap[Vertex]
val nodes = new mutable.HashMap[VertexId, FibonacciHeap.Node[Vertex]]()
distances.put(source, 0)
for (w <- neighbors) {
if (w._1 != source)
distances.put(w._1, Int.MaxValue)
predecessors.put(w._1, ListBuffer[VertexId]())
val node = q.insert(Vertex(w._1), distances(w._1))
nodes.put(w._1, node)
}
while (!q.isEmpty) {
val u = q.minNode
val node = u.data.id
q.removeMin()
//discover paths
//println("Current node is:"+node+" "+neighbors(node).size)
for (w <- neighbors(node).keys) {
//print("Neighbor is"+w)
val alt = distances(node) + neighbors(node)(w)
// if (distances(w) > alt) {
// distances(w) = alt
// q.decreaseKey(nodes(w), alt)
// }
// if (distances(w) == alt)
// predecessors(w).+=(node)
if(alt< distances(w)){
distances(w) = alt
predecessors(w).+=(node)
q.decreaseKey(nodes(w), alt)
}
}//For
}
val sum = distances.values.sum
sum
}
为了对你原来的问题提供一些答案,我怀疑你的 RDD 只有一个分区,因此使用一个核心来处理。
edgeListFile
方法有一个参数来指定所需的最小分区数。
此外,您可以使用 repartition
来获得更多分区。
您提到了 coalesce
,但默认情况下只会减少分区数,请参阅此问题:
我有一个获取节点邻居的函数,对于我使用广播变量和节点本身的 id 的邻居,它计算 node.I 映射图中每个节点的接近中心性结果 function.When 我打开任务管理器 cpu 根本没有被使用,就好像它没有并行工作一样,内存也是如此,但是每个节点都并行执行该功能而且数据很大,需要时间才能完成,它不像不需要 resources.Every 的帮助,非常感谢,谢谢。
为了加载图表,我使用 val graph = GraphLoader.edgeListFile(sc, path).cache
object ClosenessCentrality {
case class Vertex(id: VertexId)
def run(graph: Graph[Int, Float],sc: SparkContext): Unit = {
//Have to reverse edges and make graph undirected because is bipartite
val neighbors = CollectNeighbors.collectWeightedNeighbors(graph).collectAsMap()
val bNeighbors = sc.broadcast(neighbors)
val result = graph.vertices.map(f => shortestPaths(f._1,bNeighbors.value))
//result.coalesce(1)
result.count()
}
def shortestPaths(source: VertexId, neighbors: Map[VertexId, Map[VertexId, Float]]): Double ={
val predecessors = new mutable.HashMap[VertexId, ListBuffer[VertexId]]()
val distances = new mutable.HashMap[VertexId, Double]()
val q = new FibonacciHeap[Vertex]
val nodes = new mutable.HashMap[VertexId, FibonacciHeap.Node[Vertex]]()
distances.put(source, 0)
for (w <- neighbors) {
if (w._1 != source)
distances.put(w._1, Int.MaxValue)
predecessors.put(w._1, ListBuffer[VertexId]())
val node = q.insert(Vertex(w._1), distances(w._1))
nodes.put(w._1, node)
}
while (!q.isEmpty) {
val u = q.minNode
val node = u.data.id
q.removeMin()
//discover paths
//println("Current node is:"+node+" "+neighbors(node).size)
for (w <- neighbors(node).keys) {
//print("Neighbor is"+w)
val alt = distances(node) + neighbors(node)(w)
// if (distances(w) > alt) {
// distances(w) = alt
// q.decreaseKey(nodes(w), alt)
// }
// if (distances(w) == alt)
// predecessors(w).+=(node)
if(alt< distances(w)){
distances(w) = alt
predecessors(w).+=(node)
q.decreaseKey(nodes(w), alt)
}
}//For
}
val sum = distances.values.sum
sum
}
为了对你原来的问题提供一些答案,我怀疑你的 RDD 只有一个分区,因此使用一个核心来处理。
edgeListFile
方法有一个参数来指定所需的最小分区数。
此外,您可以使用 repartition
来获得更多分区。
您提到了 coalesce
,但默认情况下只会减少分区数,请参阅此问题: