为什么在 Spark GraphX 中执行 Pregel 时出现 TypeMismatch 错误?
Why I get TypeMismatch error in performing Pregel in Spark GraphX?
我已经在 Spark GraphX 中使用 Pregel 编写了我的算法。但不幸的是我得到了 TypeMismatch 错误。
我加载图表:val my_graph= GraphLoader.edgeListFile(sc, path)
。所以一开始节点的结构如下:
(1,1)
(2,1)
(3,1)
以 nodeID 为键,默认值为 1 属性。
在 run2
函数中,首先我更改了结构,以便使每个节点都可以存储多个属性。因为我正在研究重叠社区检测算法,所以属性是标签和它们的分数。
在run2
的第运行处,每个节点的结构如:
(34,Map(34 -> (1.0,34)))
(13,Map(13 -> (1.0,13)))
(4,Map(4 -> (1.0,4)))
(16,Map(16 -> (1.0,16)))
(22,Map(22 -> (1.0,22)))
这意味着节点 34 具有标签 34 并且它的分数等于 1。然后每个节点可以存储从其邻居接收的几个属性,并且在接下来的步骤中它可以将它们发送给它的邻居。
在算法结束时,每个节点可以包含多个属性或仅包含一个 属性,例如以下结构:
(1,Map((2->(0.49,1),(8->(0.9,1)),(13->(0.79,1))))
(2,Map((11->(0.89,2)),(6->(0.68,2)),(13->(0.79,2)),(10->(0.57,2))))
(3,Map((20->(0.0.8,3)),(1->(0.66,3))))
上面的结构表明,例如,节点1属于社区2,得分为0.49,属于社区8,得分为0.9,属于社区13,得分为0.79。
以下代码显示了 Pregel 中定义的不同函数。
def run2[VD, ED: ClassTag](graph: Graph[VD, ED], maxSteps: Int) = {
val temp_graph = graph.mapVertices { case (vid, _) => mutable.HashMap[VertexId, (Double,VertexId)](vid -> (1,vid)) }
def sendMessage(e: EdgeTriplet[mutable.HashMap[VertexId, (Double,VertexId)], ED]): Iterator[(VertexId, mutable.HashMap[VertexId, (Double, VertexId)])] = {
Iterator((e.srcId,e.dstAttr), (e.dstId,e.srcAttr))
}
def mergeMessage(count1: (mutable.HashMap[VertexId, (Double,VertexId)]), count2: (mutable.HashMap[VertexId, (Double,VertexId)]))= {
val communityMap = new mutable.HashMap[VertexId, List[(Double, VertexId)]]
(count1.keySet ++ count2.keySet).map(key => {
val count1Val = count1.getOrElse(key, (0D,0:VertexId))
val count2Val = count2.getOrElse(key, (0D,0:VertexId))
communityMap += key->(count1Val::communityMap(key))
communityMap += key->(count2Val::communityMap(key))
})
communityMap
}
def vertexProgram(vid: VertexId, attr: mutable.HashMap[VertexId,(Double, VertexId)], message: mutable.HashMap[VertexId, List[(Double, VertexId)]]) = {
if (message.isEmpty)
attr
else {
val labels_score: mutable.HashMap[VertexId, Double] = message.map {
key =>
var value_sum = 0D
var isMemberFlag = 0
var maxSimilar_result = 0D
val max_similar = most_similar.filter(x=>x._1==vid)(1)
if (key._2.exists(x=>x._2==max_similar)) isMemberFlag = 1 else isMemberFlag = 0
key._2.map {
values =>
if (values._2==max_similar) maxSimilar_result = values._1 else maxSimilar_result = 0D
val temp = broadcastVariable.value(vid)(values._2)._2
value_sum += values._1 * temp
}
value_sum += (beta*value_sum)+((1-beta)*maxSimilar_result)
(key._1,value_sum) //label list
}
val max_value = labels_score.maxBy(x=>x._2)._2.toDouble
val dividedByMax = labels_score.map(x=>(x._1,x._2/max_value)) // divide by maximum value
val resultMap: mutable.HashMap[VertexId,Double] = new mutable.HashMap[VertexId, Double]
dividedByMax.foreach{ row => // select labels more than threshold P = 0.5
if (row._2 >= p) resultMap += row
}
val max_for_normalize= resultMap.values.sum
val res = resultMap.map(x=>(x._1->(x._2/max_for_normalize,x._1))) // Normalize labels
res
}
}
val initialMessage = mutable.HashMap[VertexId, (Double,VertexId)]()
val overlapCommunitiesGraph = Pregel(temp_graph, initialMessage, maxIterations = maxSteps)(
vprog = vertexProgram,
sendMsg = sendMessage,
mergeMsg = mergeMessage)
overlapCommunitiesGraph
}
val my_graph= GraphLoader.edgeListFile(sc, path)
val new_updated_graph2 = run2(my_graph, 1)
在上面的代码中,p=0.5
和beta=0.5
。 most_similar
是一个RDD,包含每个节点和它最重要的节点。例如(1,3)
表示节点3是节点1最相似的邻居。broadcatVariable
结构如下:
(19,Map(33 -> (1.399158675718661,0.6335049099178383), 34 -> (1.4267350687130098,0.6427405501408145)))
(15,Map(33 -> (1.399158675718661,0.6335049099178383), 34 -> (1.4267350687130098,0.6427405501408145)))
...
该结构显示了一个节点作为键和它的邻居作为值之间的关系。例如,节点19与节点33和34是邻居,关系由它们之间的分数表示。
在该算法中,每个节点发送每个 属性,即 Map
,其中包含多个标签及其分数。然后在 mergeMessage
函数中,将具有相同编号的标签的值放入 List
中,并在 vertexProgram
中为每个标签或键处理其列表。
已更新
根据下图中的等式,我使用 List
为 Label 收集不同的分数,并在 vertexProgram
函数中处理它们。因为我需要P_ji
来处理每个节点的标签分数,所以我不知道是否可以在mergeMessage
函数中执行,或者是否需要在vertexProgram
中执行。 P_ji
是源节点与其邻居之间的分数,应该乘以标签分数。
我得到的错误显示在 vprog = vertexProgram,
行的前面,也显示在这张图片中。谁能帮我解决这个错误?
主要问题是您使用了两种不同类型的消息。初始消息的类型为 mutable.HashMap[VertexId, (Double,VertexId)]
,但在合并两个消息后(使用 mergeMessage
函数)类型变为 mutable.HashMap[VertexId, List[(Double,VertexId)]]
。这里的问题是,由于类型错误,现在合并的消息无法与另一条消息合并。
有两种方法可以解决这个问题:
- 将消息类型更改为
mutable.HashMap[VertexId, List[(Double,VertexId)]]
,确保初始消息与此匹配。
- 保持消息类型为
mutable.HashMap[VertexId, (Double,VertexId)]
并更改 mergeMessage
的输出类型以匹配。
下面是关于这两个选项的可能解决方案的一些草图。它们内部可能存在一些错误,因为实际需要的逻辑不是很清楚(代码中有一些未使用的变量等)。当与代码的其余部分结合时,这两个选项都可以 运行 并且将 return 一张新图。
解决方案 1:
您需要调整 sendMessage
、mergeMessage
和 initialMessage
来处理列表。这可以按如下方式完成:
def sendMessage(e: EdgeTriplet[Map[VertexId, (Double,VertexId)], ED]): Iterator[(VertexId, Map[VertexId, List[(Double, VertexId)]])] = {
val msg1 = e.dstAttr.map{ case (k,v) => (k, List(v)) }
val msg2 = e.srcAttr.map{ case (k,v) => (k, List(v)) }
Iterator((e.srcId, msg1), (e.dstId, msg2))
}
def mergeMessage(count1: Map[VertexId, List[(Double,VertexId)]], count2: Map[VertexId, List[(Double,VertexId)]])= {
val merged = count1.toSeq ++ count2.toSeq
val new_message = merged.groupBy(_._1).map{case (k,v) => (k, v.map(_._2).flatten.toList)}
new_message
}
val initialMessage = Map[VertexId, List[(Double,VertexId)]]()
可能vertexProgram
中的messages.isEmpty
return也需要调整
解决方案 2:
要使用没有列表的消息,您需要将合并逻辑从 vertexProgram
移动到 mergeMessage
。我稍微简化了代码,因此代码可能需要一些测试。
def mergeMessage(count1: (Map[VertexId, (Double, VertexId)]), count2: (Map[VertexId, (Double, VertexId)]))= {
val merged = count1.toSeq ++ count2.toSeq
val grouped = merged.groupBy(_._1)
val new_message = grouped.map{ case (key, key_values) =>
val values = key_values.map(_._2)
val max_similar = most_similar.filter(x => x._1 == key).headOption match {
case Some(x) => x
case _ => -1 // What should happen when there is no match?
}
val maxSimilar_result = values.filter(v => v._2 == max_similar).headOption match {
case Some(x) => x._1
case _ => 0.0
}
val value_sum = values.map{ v => v._1 * broadcastVariable.value(key)(v._2)._2}.sum
val res = (beta*value_sum)+((1-beta)*maxSimilar_result)
(key, (res, key))
}
new_message.toMap
}
def vertexProgram(vid: VertexId, attr: Map[VertexId, (Double, VertexId)], messages: Map[VertexId, (Double, VertexId)]) = {
if (messages.isEmpty){
attr
} else {
val labels_score = messages.map(m => (m._1, m._2._1))
val max_value = labels_score.maxBy(x => x._2)._2.toDouble
val dividedByMax = labels_score.map(x => (x._1, x._2 / max_value)) // divide by maximum value
// select labels more than threshold P = 0.5
val resultMap = dividedByMax.filter{ row => row._2 >= p }
val max_for_normalize= resultMap.values.sum
val res = resultMap.map(x => (x._1 -> (x._2 / max_for_normalize, x._1))) // Normalize labels
res
}
}
备注:
- 目前在
sendMessage
,一条消息被发送到两个节点,与图边的方向无关。这是否正确取决于所需的逻辑。
- 我将
mutable.HashMap
更改为正常(不可变)Map
。如果可能,总是首选使用不可变选项。
- 解决方案 1 应该更易于使用,因为
vertexProgram
中的逻辑非常复杂。还有一些变量目前没有做任何事情,但也许以后会用到。如果无法以迭代方式合并消息(并且您需要一次查看所有消息),那么使用 List
将是可行的方法。
我已经在 Spark GraphX 中使用 Pregel 编写了我的算法。但不幸的是我得到了 TypeMismatch 错误。
我加载图表:val my_graph= GraphLoader.edgeListFile(sc, path)
。所以一开始节点的结构如下:
(1,1)
(2,1)
(3,1)
以 nodeID 为键,默认值为 1 属性。
在 run2
函数中,首先我更改了结构,以便使每个节点都可以存储多个属性。因为我正在研究重叠社区检测算法,所以属性是标签和它们的分数。
在run2
的第运行处,每个节点的结构如:
(34,Map(34 -> (1.0,34)))
(13,Map(13 -> (1.0,13)))
(4,Map(4 -> (1.0,4)))
(16,Map(16 -> (1.0,16)))
(22,Map(22 -> (1.0,22)))
这意味着节点 34 具有标签 34 并且它的分数等于 1。然后每个节点可以存储从其邻居接收的几个属性,并且在接下来的步骤中它可以将它们发送给它的邻居。
在算法结束时,每个节点可以包含多个属性或仅包含一个 属性,例如以下结构:
(1,Map((2->(0.49,1),(8->(0.9,1)),(13->(0.79,1))))
(2,Map((11->(0.89,2)),(6->(0.68,2)),(13->(0.79,2)),(10->(0.57,2))))
(3,Map((20->(0.0.8,3)),(1->(0.66,3))))
上面的结构表明,例如,节点1属于社区2,得分为0.49,属于社区8,得分为0.9,属于社区13,得分为0.79。
以下代码显示了 Pregel 中定义的不同函数。
def run2[VD, ED: ClassTag](graph: Graph[VD, ED], maxSteps: Int) = {
val temp_graph = graph.mapVertices { case (vid, _) => mutable.HashMap[VertexId, (Double,VertexId)](vid -> (1,vid)) }
def sendMessage(e: EdgeTriplet[mutable.HashMap[VertexId, (Double,VertexId)], ED]): Iterator[(VertexId, mutable.HashMap[VertexId, (Double, VertexId)])] = {
Iterator((e.srcId,e.dstAttr), (e.dstId,e.srcAttr))
}
def mergeMessage(count1: (mutable.HashMap[VertexId, (Double,VertexId)]), count2: (mutable.HashMap[VertexId, (Double,VertexId)]))= {
val communityMap = new mutable.HashMap[VertexId, List[(Double, VertexId)]]
(count1.keySet ++ count2.keySet).map(key => {
val count1Val = count1.getOrElse(key, (0D,0:VertexId))
val count2Val = count2.getOrElse(key, (0D,0:VertexId))
communityMap += key->(count1Val::communityMap(key))
communityMap += key->(count2Val::communityMap(key))
})
communityMap
}
def vertexProgram(vid: VertexId, attr: mutable.HashMap[VertexId,(Double, VertexId)], message: mutable.HashMap[VertexId, List[(Double, VertexId)]]) = {
if (message.isEmpty)
attr
else {
val labels_score: mutable.HashMap[VertexId, Double] = message.map {
key =>
var value_sum = 0D
var isMemberFlag = 0
var maxSimilar_result = 0D
val max_similar = most_similar.filter(x=>x._1==vid)(1)
if (key._2.exists(x=>x._2==max_similar)) isMemberFlag = 1 else isMemberFlag = 0
key._2.map {
values =>
if (values._2==max_similar) maxSimilar_result = values._1 else maxSimilar_result = 0D
val temp = broadcastVariable.value(vid)(values._2)._2
value_sum += values._1 * temp
}
value_sum += (beta*value_sum)+((1-beta)*maxSimilar_result)
(key._1,value_sum) //label list
}
val max_value = labels_score.maxBy(x=>x._2)._2.toDouble
val dividedByMax = labels_score.map(x=>(x._1,x._2/max_value)) // divide by maximum value
val resultMap: mutable.HashMap[VertexId,Double] = new mutable.HashMap[VertexId, Double]
dividedByMax.foreach{ row => // select labels more than threshold P = 0.5
if (row._2 >= p) resultMap += row
}
val max_for_normalize= resultMap.values.sum
val res = resultMap.map(x=>(x._1->(x._2/max_for_normalize,x._1))) // Normalize labels
res
}
}
val initialMessage = mutable.HashMap[VertexId, (Double,VertexId)]()
val overlapCommunitiesGraph = Pregel(temp_graph, initialMessage, maxIterations = maxSteps)(
vprog = vertexProgram,
sendMsg = sendMessage,
mergeMsg = mergeMessage)
overlapCommunitiesGraph
}
val my_graph= GraphLoader.edgeListFile(sc, path)
val new_updated_graph2 = run2(my_graph, 1)
在上面的代码中,p=0.5
和beta=0.5
。 most_similar
是一个RDD,包含每个节点和它最重要的节点。例如(1,3)
表示节点3是节点1最相似的邻居。broadcatVariable
结构如下:
(19,Map(33 -> (1.399158675718661,0.6335049099178383), 34 -> (1.4267350687130098,0.6427405501408145)))
(15,Map(33 -> (1.399158675718661,0.6335049099178383), 34 -> (1.4267350687130098,0.6427405501408145)))
...
该结构显示了一个节点作为键和它的邻居作为值之间的关系。例如,节点19与节点33和34是邻居,关系由它们之间的分数表示。
在该算法中,每个节点发送每个 属性,即 Map
,其中包含多个标签及其分数。然后在 mergeMessage
函数中,将具有相同编号的标签的值放入 List
中,并在 vertexProgram
中为每个标签或键处理其列表。
已更新
根据下图中的等式,我使用 List
为 Label 收集不同的分数,并在 vertexProgram
函数中处理它们。因为我需要P_ji
来处理每个节点的标签分数,所以我不知道是否可以在mergeMessage
函数中执行,或者是否需要在vertexProgram
中执行。 P_ji
是源节点与其邻居之间的分数,应该乘以标签分数。
我得到的错误显示在 vprog = vertexProgram,
行的前面,也显示在这张图片中。谁能帮我解决这个错误?
主要问题是您使用了两种不同类型的消息。初始消息的类型为 mutable.HashMap[VertexId, (Double,VertexId)]
,但在合并两个消息后(使用 mergeMessage
函数)类型变为 mutable.HashMap[VertexId, List[(Double,VertexId)]]
。这里的问题是,由于类型错误,现在合并的消息无法与另一条消息合并。
有两种方法可以解决这个问题:
- 将消息类型更改为
mutable.HashMap[VertexId, List[(Double,VertexId)]]
,确保初始消息与此匹配。 - 保持消息类型为
mutable.HashMap[VertexId, (Double,VertexId)]
并更改mergeMessage
的输出类型以匹配。
下面是关于这两个选项的可能解决方案的一些草图。它们内部可能存在一些错误,因为实际需要的逻辑不是很清楚(代码中有一些未使用的变量等)。当与代码的其余部分结合时,这两个选项都可以 运行 并且将 return 一张新图。
解决方案 1:
您需要调整 sendMessage
、mergeMessage
和 initialMessage
来处理列表。这可以按如下方式完成:
def sendMessage(e: EdgeTriplet[Map[VertexId, (Double,VertexId)], ED]): Iterator[(VertexId, Map[VertexId, List[(Double, VertexId)]])] = {
val msg1 = e.dstAttr.map{ case (k,v) => (k, List(v)) }
val msg2 = e.srcAttr.map{ case (k,v) => (k, List(v)) }
Iterator((e.srcId, msg1), (e.dstId, msg2))
}
def mergeMessage(count1: Map[VertexId, List[(Double,VertexId)]], count2: Map[VertexId, List[(Double,VertexId)]])= {
val merged = count1.toSeq ++ count2.toSeq
val new_message = merged.groupBy(_._1).map{case (k,v) => (k, v.map(_._2).flatten.toList)}
new_message
}
val initialMessage = Map[VertexId, List[(Double,VertexId)]]()
可能vertexProgram
中的messages.isEmpty
return也需要调整
解决方案 2:
要使用没有列表的消息,您需要将合并逻辑从 vertexProgram
移动到 mergeMessage
。我稍微简化了代码,因此代码可能需要一些测试。
def mergeMessage(count1: (Map[VertexId, (Double, VertexId)]), count2: (Map[VertexId, (Double, VertexId)]))= {
val merged = count1.toSeq ++ count2.toSeq
val grouped = merged.groupBy(_._1)
val new_message = grouped.map{ case (key, key_values) =>
val values = key_values.map(_._2)
val max_similar = most_similar.filter(x => x._1 == key).headOption match {
case Some(x) => x
case _ => -1 // What should happen when there is no match?
}
val maxSimilar_result = values.filter(v => v._2 == max_similar).headOption match {
case Some(x) => x._1
case _ => 0.0
}
val value_sum = values.map{ v => v._1 * broadcastVariable.value(key)(v._2)._2}.sum
val res = (beta*value_sum)+((1-beta)*maxSimilar_result)
(key, (res, key))
}
new_message.toMap
}
def vertexProgram(vid: VertexId, attr: Map[VertexId, (Double, VertexId)], messages: Map[VertexId, (Double, VertexId)]) = {
if (messages.isEmpty){
attr
} else {
val labels_score = messages.map(m => (m._1, m._2._1))
val max_value = labels_score.maxBy(x => x._2)._2.toDouble
val dividedByMax = labels_score.map(x => (x._1, x._2 / max_value)) // divide by maximum value
// select labels more than threshold P = 0.5
val resultMap = dividedByMax.filter{ row => row._2 >= p }
val max_for_normalize= resultMap.values.sum
val res = resultMap.map(x => (x._1 -> (x._2 / max_for_normalize, x._1))) // Normalize labels
res
}
}
备注:
- 目前在
sendMessage
,一条消息被发送到两个节点,与图边的方向无关。这是否正确取决于所需的逻辑。 - 我将
mutable.HashMap
更改为正常(不可变)Map
。如果可能,总是首选使用不可变选项。 - 解决方案 1 应该更易于使用,因为
vertexProgram
中的逻辑非常复杂。还有一些变量目前没有做任何事情,但也许以后会用到。如果无法以迭代方式合并消息(并且您需要一次查看所有消息),那么使用List
将是可行的方法。