如何使用 RDD 中另一个 属性 的值检索 属性 的值
how to retrieve the value of a property using the value of another property in RDDs
我有一个 links:JdbcRDD[String],其中包含 link 的形式:
{"bob,michael"}
分别为每个link的来源和目的地。
我可以拆分每个字符串以检索唯一标识源节点和目标节点的字符串。
然后我有一个 users:RDD[(Long, Vertex)] 包含我图中的所有顶点。
每个顶点都有一个 nameId:String 属性 和一个 nodeId:Long 属性。
我想从 stringId 中检索 nodeId,但不知道如何实现这个逻辑,在 Scala 和 Spark 中都是新手。我被这段代码困住了:
val reflinks = links.map { x =>
// split each line in an array
val row = x.split(',')
// retrieve the id using the row(0) and row(1) values
val source = users.filter(_._2.stringId == row(0)).collect()
val dest = users.filter(_._2.stringId == row(1)).collect()
// return last value
Edge(source(0)._1, dest(0)._1, "referral")
// return the link in Graphx format
Edge(ids(0), ids(1), "ref")
}
通过这个解决方案,我得到:
org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
不幸的是,您不能在 Spark 中嵌套 RDD。也就是说,当您在发送到另一个 RDD 的闭包内时,您无法访问一个 RDD。
如果你想结合来自多个 RDD 的知识,你需要以某种方式 join
它们。这是解决此问题的一种方法:
import org.apache.spark.graphx._
import org.apache.spark.SparkContext._
// These are some toy examples of the original data for the edges and the vertices
val rawEdges = sc.parallelize(Array("m,a", "c,a", "g,c"))
val rawNodes = sc.parallelize(Array( ("m", 1L), ("a", 2L), ("c", 3L), ("g", 4L)))
val parsedEdges: RDD[(String, String)] = rawEdges.map(x => x.split(",")).map{ case Array(x,y) => (x,y) }
// The two joins here are required since we need to get the ID for both nodes of each edge
// If you want to stay in the RDD domain, you need to do this double join.
val resolvedFirstRdd = parsedEdges.join(rawNodes).map{case (firstTxt,(secondTxt,firstId)) => (secondTxt,firstId) }
val edgeRdd = resolvedFirstRdd.join(rawNodes).map{case (firstTxt,(firstId,secondId)) => Edge(firstId,secondId, "ref") }
// The prints() are here for testing (they can be expensive to keep in the actual code)
edgeRdd.foreach(println)
val g = Graph(rawNodes.map(x => (x._2, x._1)), edgeRdd)
println("In degrees")
g.inDegrees.foreach(println)
println("Out degrees")
g.outDegrees.foreach(println)
测试打印输出:
Edge(3,2,ref)
Edge(1,2,ref)
Edge(4,3,ref)
In degrees
(3,1)
(2,2)
Out degrees
(3,1)
(1,1)
(4,1)
我有一个 links:JdbcRDD[String],其中包含 link 的形式:
{"bob,michael"}
分别为每个link的来源和目的地。 我可以拆分每个字符串以检索唯一标识源节点和目标节点的字符串。 然后我有一个 users:RDD[(Long, Vertex)] 包含我图中的所有顶点。 每个顶点都有一个 nameId:String 属性 和一个 nodeId:Long 属性。
我想从 stringId 中检索 nodeId,但不知道如何实现这个逻辑,在 Scala 和 Spark 中都是新手。我被这段代码困住了:
val reflinks = links.map { x =>
// split each line in an array
val row = x.split(',')
// retrieve the id using the row(0) and row(1) values
val source = users.filter(_._2.stringId == row(0)).collect()
val dest = users.filter(_._2.stringId == row(1)).collect()
// return last value
Edge(source(0)._1, dest(0)._1, "referral")
// return the link in Graphx format
Edge(ids(0), ids(1), "ref")
}
通过这个解决方案,我得到:
org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
不幸的是,您不能在 Spark 中嵌套 RDD。也就是说,当您在发送到另一个 RDD 的闭包内时,您无法访问一个 RDD。
如果你想结合来自多个 RDD 的知识,你需要以某种方式 join
它们。这是解决此问题的一种方法:
import org.apache.spark.graphx._
import org.apache.spark.SparkContext._
// These are some toy examples of the original data for the edges and the vertices
val rawEdges = sc.parallelize(Array("m,a", "c,a", "g,c"))
val rawNodes = sc.parallelize(Array( ("m", 1L), ("a", 2L), ("c", 3L), ("g", 4L)))
val parsedEdges: RDD[(String, String)] = rawEdges.map(x => x.split(",")).map{ case Array(x,y) => (x,y) }
// The two joins here are required since we need to get the ID for both nodes of each edge
// If you want to stay in the RDD domain, you need to do this double join.
val resolvedFirstRdd = parsedEdges.join(rawNodes).map{case (firstTxt,(secondTxt,firstId)) => (secondTxt,firstId) }
val edgeRdd = resolvedFirstRdd.join(rawNodes).map{case (firstTxt,(firstId,secondId)) => Edge(firstId,secondId, "ref") }
// The prints() are here for testing (they can be expensive to keep in the actual code)
edgeRdd.foreach(println)
val g = Graph(rawNodes.map(x => (x._2, x._1)), edgeRdd)
println("In degrees")
g.inDegrees.foreach(println)
println("Out degrees")
g.outDegrees.foreach(println)
测试打印输出:
Edge(3,2,ref)
Edge(1,2,ref)
Edge(4,3,ref)
In degrees
(3,1)
(2,2)
Out degrees
(3,1)
(1,1)
(4,1)