使用 graphx 构建的图形未正确广播
Graph constructed using graphx is not getting broadcast-ed properly
我使用 graphx 创建了一个图,现在我需要从原始图中提取子图。 users_graph 是一个 RDD,它有一个索引到用户的子图。问题是这些子图没有得到计算。当我尝试对这些子图进行操作时出现 java.lang.NullPointerException 异常。
class VertexProperty(val id:Long) extends Serializable
case class User(val userId:Long, var offset:Int, val userCode:String, val Name:String, val Surname:String, val organizational_unit:String, val UME:String, val person_type:String, val SOD_HIGH:String, val SOD_MEDIUM:String, val SOD_LOW:String, val Under_mitigated:String) extends VertexProperty(userId)
case class Account(val accountId:Long, var offset:Int, val userCode:String, val userId:String, val account_creation_date:String, var disabled:String, var forcechangepwd:String, var pwdlife:String, var numberloginerror:String, var lastchangepwd:String, var lastlogin:String, var lastwronglogin:String, var state:String, var expire:String, var last_cert_time:String, var creation_date:String, var creation_user:String,var challenge_counter:String, var challenge_failed_attempt:String) extends VertexProperty(accountId) //Check if userCode is actually the code in this example.
case class Application(var applicationId:Long, var offset:Int, var Name:String, var Description:String, var Target:String, var Owner:String, var Ownercode:String, var Creation_date:String, var Creation_user:String) extends VertexProperty(applicationId)
case class Entitlement(val entitlementId:Long, var offset:Int, val Name:String, var Code:String, var Description:String, var Type:String, var Application:String, var Administrative:String, var Parent_ID:String, var Owner_code:String, var Scope_type:String, var Business_name:String, var Business_policy:String, var SOD_high:String, var SOD_medium:String, var SOD_low:String) extends VertexProperty(entitlementId)
/*
Some code for computing vertexRDD and edges
*/
val graph: Graph[VertexProperty,String] = Graph(vertexRDD, edges, new VertexProperty(-1))
val triplets = graph.triplets
val temp = triplets.map(t => t.attr)
val distinct_users = temp.distinct.filter(t => t != "NULL")
var bcast_graph = sc.broadcast(graph)
val users_graph = distinct_users.map(du => du -> bcast_graph.value.subgraph(epred = t => t.attr == du))
长话短说,您不能 broadcast
一个 Graph
,因为它包含一个 RDD
(实际上是两个)。并且您不能在 RDD
上的 map
函数中使用 Graph
,因为它由 RDDs
.
组成
就像我说的,这是一个很长的故事,为什么你不能做其中任何一个 - 它们实际上是同一枚硬币的两个面。无论哪种方式,您都面临同样的问题。
Spark 建立在主/从概念之上。它在主人的记忆 space 中定义了 RDDs
和与之相关的元操作。但是代码——map(...)
中的 ...
——在从属(称为执行程序)上执行。当您以任何方式引用不同的 RDD
时,您的地图代码不能 运行 在执行程序中 - 并且 broadcast
永远不会有帮助,因为 RDD
引用只能存在于master.
你可以做什么呢?您有两个选择:
- 使用
collect()
收集您需要的数据,然后 broadcast
该数据或仅在您的 map
代码中引用它。 collect()
将所有数据拉入 master,但对于您的问题最重要的是,这意味着您现在可以在不使用 RDD
引用的情况下引用数据,因此您可以将收集到的数据发送给执行者-- 通过使用 broadcast
或仅通过在您的 map(...)
代码中引用它们(Spark 会将数据的副本发送给您的执行者)。这些中的哪一个有效——或者两者是否有效——取决于你的数据大小、速度预期等。
- 使用
RDD.join()
或RDD.cogroup()
同时处理两个Graphs
。
由于您正在处理高阶结构 -- GraphX Graph
,因此这两者都很复杂。您将不得不分别处理单独的 Graph.vertices
和 Graph.edges
RDDs
,执行您的 collect()
或 join()
,然后重新构建最终的 Graph
通过拼接适当的 RDDs
.
我使用 graphx 创建了一个图,现在我需要从原始图中提取子图。 users_graph 是一个 RDD,它有一个索引到用户的子图。问题是这些子图没有得到计算。当我尝试对这些子图进行操作时出现 java.lang.NullPointerException 异常。
class VertexProperty(val id:Long) extends Serializable
case class User(val userId:Long, var offset:Int, val userCode:String, val Name:String, val Surname:String, val organizational_unit:String, val UME:String, val person_type:String, val SOD_HIGH:String, val SOD_MEDIUM:String, val SOD_LOW:String, val Under_mitigated:String) extends VertexProperty(userId)
case class Account(val accountId:Long, var offset:Int, val userCode:String, val userId:String, val account_creation_date:String, var disabled:String, var forcechangepwd:String, var pwdlife:String, var numberloginerror:String, var lastchangepwd:String, var lastlogin:String, var lastwronglogin:String, var state:String, var expire:String, var last_cert_time:String, var creation_date:String, var creation_user:String,var challenge_counter:String, var challenge_failed_attempt:String) extends VertexProperty(accountId) //Check if userCode is actually the code in this example.
case class Application(var applicationId:Long, var offset:Int, var Name:String, var Description:String, var Target:String, var Owner:String, var Ownercode:String, var Creation_date:String, var Creation_user:String) extends VertexProperty(applicationId)
case class Entitlement(val entitlementId:Long, var offset:Int, val Name:String, var Code:String, var Description:String, var Type:String, var Application:String, var Administrative:String, var Parent_ID:String, var Owner_code:String, var Scope_type:String, var Business_name:String, var Business_policy:String, var SOD_high:String, var SOD_medium:String, var SOD_low:String) extends VertexProperty(entitlementId)
/*
Some code for computing vertexRDD and edges
*/
val graph: Graph[VertexProperty,String] = Graph(vertexRDD, edges, new VertexProperty(-1))
val triplets = graph.triplets
val temp = triplets.map(t => t.attr)
val distinct_users = temp.distinct.filter(t => t != "NULL")
var bcast_graph = sc.broadcast(graph)
val users_graph = distinct_users.map(du => du -> bcast_graph.value.subgraph(epred = t => t.attr == du))
长话短说,您不能 broadcast
一个 Graph
,因为它包含一个 RDD
(实际上是两个)。并且您不能在 RDD
上的 map
函数中使用 Graph
,因为它由 RDDs
.
就像我说的,这是一个很长的故事,为什么你不能做其中任何一个 - 它们实际上是同一枚硬币的两个面。无论哪种方式,您都面临同样的问题。
Spark 建立在主/从概念之上。它在主人的记忆 space 中定义了 RDDs
和与之相关的元操作。但是代码——map(...)
中的 ...
——在从属(称为执行程序)上执行。当您以任何方式引用不同的 RDD
时,您的地图代码不能 运行 在执行程序中 - 并且 broadcast
永远不会有帮助,因为 RDD
引用只能存在于master.
你可以做什么呢?您有两个选择:
- 使用
collect()
收集您需要的数据,然后broadcast
该数据或仅在您的map
代码中引用它。collect()
将所有数据拉入 master,但对于您的问题最重要的是,这意味着您现在可以在不使用RDD
引用的情况下引用数据,因此您可以将收集到的数据发送给执行者-- 通过使用broadcast
或仅通过在您的map(...)
代码中引用它们(Spark 会将数据的副本发送给您的执行者)。这些中的哪一个有效——或者两者是否有效——取决于你的数据大小、速度预期等。 - 使用
RDD.join()
或RDD.cogroup()
同时处理两个Graphs
。
由于您正在处理高阶结构 -- GraphX Graph
,因此这两者都很复杂。您将不得不分别处理单独的 Graph.vertices
和 Graph.edges
RDDs
,执行您的 collect()
或 join()
,然后重新构建最终的 Graph
通过拼接适当的 RDDs
.