如何将属性附加到 graphx 中的顶点并检索邻域
how to attach properties to vertices in a graphx and retrieve the neighbourhood
我对 Spark 和 Scala 比较陌生...我有一个 graph:Graph[Int, String],我想将我在 DataFrame 中的一些属性附加到这些顶点。
我需要做的是,对于每个顶点,找到每个 属性 邻域内的平均值。到目前为止,这是我的方法,但我不明白如何正确映射我从两个数据框的连接中获得的行:
val res = graph.collectNeighbors(EdgeDirection.Either)
.toDF("ID", "neighbours")
.join(aDataFrameWithProperties, "ID")
.map{x => // this is where I am lost
}
我认为我的方法不对,因为我将每个顶点的属性与其邻居的数组连接起来,但我仍然不知道邻居的属性值...
编辑
一些数据可以帮助理解我想要完成的事情...假设您按照对
的回答构建图表
val sqlc : SQLContext = ???
case class Person(id: Long, country: String, age: Int)
val testPeople = Seq(
Person(1, "Romania" , 15),
Person(2, "New Zealand", 30),
Person(3, "Romania" , 17),
Person(4, "Iceland" , 20),
Person(5, "Romania" , 40),
Person(6, "Romania" , 44),
Person(7, "Romania" , 45),
Person(8, "Iceland" , 21),
Person(9, "Iceland" , 22)
)
val people = sqlc.createDataFrame(testPeople)
val peopleR = people
.withColumnRenamed("id" , "idR")
.withColumnRenamed("country", "countryR")
.withColumnRenamed("age" , "ageR")
import org.apache.spark.sql.functions._
val relations = people.join(peopleR,
(people("id") < peopleR("idR")) &&
(people("country") === peopleR("countryR")) &&
(abs(people("age") - peopleR("ageR")) < 5))
import org.apache.spark.graphx._
val edges = EdgeRDD.fromEdges(relations.map(row => Edge(
row.getAs[Long]("id"), row.getAs[Long]("idR"), ())))
val users = VertexRDD.apply(people.map(row => (row.getAs[Int]("id").toLong, row.getAs[Int]("id").toInt)))
val graph = Graph(users, edges)
然后你有一个像这样的数据框:
case class Person(id:Long, gender:Int, income:Int)
val properties = Seq(
Person(1, 0, 321),
Person(2, 1, 212),
Person(3, 0, 212),
Person(4, 0, 122),
Person(5, 1, 898),
Person(6, 1, 212),
Person(7, 1, 22),
Person(8, 0, 8),
Person(9, 0, 212)
)
val people = sqlc.createDataFrame(properties)
我想为每个顶点计算邻居的平均性别和平均收入,作为 DataFrame 返回
一般来说,您应该使用图形运算符而不是将所有内容都转换为 DataFrame
,但像这样的东西应该可以解决问题:
import org.apache.spark.sql.functions.{explode, avg}
val statsDF = graph.collectNeighbors(EdgeDirection.Either)
.toDF("ID", "neighbours")
// Flatten neighbours column
.withColumn("neighbour", explode($"neighbours"))
// and extract neighbour id
.select($"ID".alias("this_id"), $"neighbour._1".alias("other_id"))
// join with people
.join(people, people("ID") === $"other_id")
.groupBy($"this_id")
.agg(avg($"gender"), avg($"income"))
what if instead of an average I'd like to count, say, the number of neighbours with gender = the gender of myself and then find the average over all connections
为此,您需要两个单独的联接 - 一个在 this_id
上,一个在 ohter_id
上。接下来,您可以简单地使用以下表达式进行聚合:
avg((this_gender === other_gender).cast("integer"))
关于图形运算符,您可以使用一些运算。对于初学者,您可以使用连接操作将属性添加到顶点:
val properties: RDD[(VertexId, (Int, Int))] = sc.parallelize(Seq(
(1L, (0, 321)), (2L, (1, 212)), (3L, (0, 212)),
(4L, (0, 122)), (5L, (1, 898)), (6L, (1, 212)),
(7L, (1, 22)), (8L, (0, 8)), (9L, (0, 212))
))
val graphWithProperties = graph
.outerJoinVertices(properties)((_, _, prop) => prop)
// For simplicity this assumes no missing values
.mapVertices((_, props) => props.get)
接下来我们可以聚合消息以创建新的VertexRDD
val neighboursAggregated = graphWithProperties
.aggregateMessages[(Int, (Int, Int))](
triplet => {
triplet.sendToDst(1, triplet.srcAttr)
triplet.sendToSrc(1, triplet.dstAttr)
},
{case ((cnt1, (age1, inc1)), (cnt2, (age2, inc2))) =>
(cnt1 + cnt2, (age1 + age2, inc1 + inc2))}
)
终于可以替换现有属性了:
graphWithProperties.outerJoinVertices(neighboursAggregated)(
(_, oldProps, newProps) => newProps match {
case Some((cnt, (gender, inc))) => Some(
if (oldProps._1 == 1) gender.toDouble / cnt
else 1 - gender.toDouble / cnt,
inc.toDouble / cnt
)
case _ => None
})
如果您只对值感兴趣,您可以在 aggregateMessages
中传递所有必需的值并省略第二个 outerJoinVertices
。
我对 Spark 和 Scala 比较陌生...我有一个 graph:Graph[Int, String],我想将我在 DataFrame 中的一些属性附加到这些顶点。
我需要做的是,对于每个顶点,找到每个 属性 邻域内的平均值。到目前为止,这是我的方法,但我不明白如何正确映射我从两个数据框的连接中获得的行:
val res = graph.collectNeighbors(EdgeDirection.Either)
.toDF("ID", "neighbours")
.join(aDataFrameWithProperties, "ID")
.map{x => // this is where I am lost
}
我认为我的方法不对,因为我将每个顶点的属性与其邻居的数组连接起来,但我仍然不知道邻居的属性值...
编辑
一些数据可以帮助理解我想要完成的事情...假设您按照对
val sqlc : SQLContext = ???
case class Person(id: Long, country: String, age: Int)
val testPeople = Seq(
Person(1, "Romania" , 15),
Person(2, "New Zealand", 30),
Person(3, "Romania" , 17),
Person(4, "Iceland" , 20),
Person(5, "Romania" , 40),
Person(6, "Romania" , 44),
Person(7, "Romania" , 45),
Person(8, "Iceland" , 21),
Person(9, "Iceland" , 22)
)
val people = sqlc.createDataFrame(testPeople)
val peopleR = people
.withColumnRenamed("id" , "idR")
.withColumnRenamed("country", "countryR")
.withColumnRenamed("age" , "ageR")
import org.apache.spark.sql.functions._
val relations = people.join(peopleR,
(people("id") < peopleR("idR")) &&
(people("country") === peopleR("countryR")) &&
(abs(people("age") - peopleR("ageR")) < 5))
import org.apache.spark.graphx._
val edges = EdgeRDD.fromEdges(relations.map(row => Edge(
row.getAs[Long]("id"), row.getAs[Long]("idR"), ())))
val users = VertexRDD.apply(people.map(row => (row.getAs[Int]("id").toLong, row.getAs[Int]("id").toInt)))
val graph = Graph(users, edges)
然后你有一个像这样的数据框:
case class Person(id:Long, gender:Int, income:Int)
val properties = Seq(
Person(1, 0, 321),
Person(2, 1, 212),
Person(3, 0, 212),
Person(4, 0, 122),
Person(5, 1, 898),
Person(6, 1, 212),
Person(7, 1, 22),
Person(8, 0, 8),
Person(9, 0, 212)
)
val people = sqlc.createDataFrame(properties)
我想为每个顶点计算邻居的平均性别和平均收入,作为 DataFrame 返回
一般来说,您应该使用图形运算符而不是将所有内容都转换为 DataFrame
,但像这样的东西应该可以解决问题:
import org.apache.spark.sql.functions.{explode, avg}
val statsDF = graph.collectNeighbors(EdgeDirection.Either)
.toDF("ID", "neighbours")
// Flatten neighbours column
.withColumn("neighbour", explode($"neighbours"))
// and extract neighbour id
.select($"ID".alias("this_id"), $"neighbour._1".alias("other_id"))
// join with people
.join(people, people("ID") === $"other_id")
.groupBy($"this_id")
.agg(avg($"gender"), avg($"income"))
what if instead of an average I'd like to count, say, the number of neighbours with gender = the gender of myself and then find the average over all connections
为此,您需要两个单独的联接 - 一个在 this_id
上,一个在 ohter_id
上。接下来,您可以简单地使用以下表达式进行聚合:
avg((this_gender === other_gender).cast("integer"))
关于图形运算符,您可以使用一些运算。对于初学者,您可以使用连接操作将属性添加到顶点:
val properties: RDD[(VertexId, (Int, Int))] = sc.parallelize(Seq(
(1L, (0, 321)), (2L, (1, 212)), (3L, (0, 212)),
(4L, (0, 122)), (5L, (1, 898)), (6L, (1, 212)),
(7L, (1, 22)), (8L, (0, 8)), (9L, (0, 212))
))
val graphWithProperties = graph
.outerJoinVertices(properties)((_, _, prop) => prop)
// For simplicity this assumes no missing values
.mapVertices((_, props) => props.get)
接下来我们可以聚合消息以创建新的VertexRDD
val neighboursAggregated = graphWithProperties
.aggregateMessages[(Int, (Int, Int))](
triplet => {
triplet.sendToDst(1, triplet.srcAttr)
triplet.sendToSrc(1, triplet.dstAttr)
},
{case ((cnt1, (age1, inc1)), (cnt2, (age2, inc2))) =>
(cnt1 + cnt2, (age1 + age2, inc1 + inc2))}
)
终于可以替换现有属性了:
graphWithProperties.outerJoinVertices(neighboursAggregated)(
(_, oldProps, newProps) => newProps match {
case Some((cnt, (gender, inc))) => Some(
if (oldProps._1 == 1) gender.toDouble / cnt
else 1 - gender.toDouble / cnt,
inc.toDouble / cnt
)
case _ => None
})
如果您只对值感兴趣,您可以在 aggregateMessages
中传递所有必需的值并省略第二个 outerJoinVertices
。