scala中的一跳和两跳引用
One and two hop citation in scala
我有一个 "from node" 和 "to node" 的列表,如下所示:
1234 4567
1234 6789
1234 3456
4567 9876
….
重点是要找出哪个节点最显着,也就是哪个节点的一跳和二跳引用最多:1234 with (4567,6789,3456,9876 (因为连着4567))
目前我所做的只是一个 map 和 reduce 函数来获取出现次数最多的节点,该节点将覆盖单个节点引用。但是我需要涵盖 A -> B 和 B -> C 因此 A -> C 的情况。
查找前十个节点的当前代码:
val textFile = sc.textFile("cit-Patents.txt")
val arrayForm = textFile.filter(_.charAt(0)!='#')
val mapreduce = arrayForm.flatMap(line => line.split("\s+")).map(word => (word,1)).reduceByKey(_ + _).sortBy(_._2,ascending=false).take(10);
我知道 graphX 也可以帮助解决这个问题,但我不知道该怎么做。
如果您需要更多信息,请告诉我。
谢谢。
我认为根据您的情况,您不需要 spark-graphx。您的问题只需将您的基础 DataFrame
加入自身即可解决,请查看代码:
假设我们有从 X 到 Y 的直接链接的 DataFrame:
val df = Seq(
(1234, 4567),
(1234, 6789),
(1234, 3456),
(4567, 9876),
(5, 6),
(6, 7),
(6, 8),
(6, 9),
(5, 9),
(6, 10)
).toDF("X", "Y")
我们看到,有些行与另一行具有相同的 Y
值 X
值,这意味着我们可以将 DataFrame 连接到自身(让我们使用 a
和 b
别名)按条件:a.Y
应该等于 b.X
:
import org.apache.spark.sql.functions._
val twoHopCitation = df.as("a").join(
df.as("b"),
col("a.Y") === col("b.X")
)
.select(col("a.X").as("X"), col("b.Y").as("Y"))
现在我们看到了从 a.X
到 b.Y
的所有传递链接:
twoHopCitation.show()
+----+----+
| X| Y|
+----+----+
|1234|9876|
| 5| 10|
| 5| 9|
| 5| 8|
| 5| 7|
+----+----+
所以,我们所需要的就是合并这两个 DataFrame,并按 X
计数 Y
聚合并按 count Y
降序排序:
df.union(
twoHopCitation
)
.groupBy("X")
.agg(count(col("Y")).as("cntY"))
.sort(col("cntY").desc)
.show()
+----+----+
| X|cntY|
+----+----+
| 5| 6|
| 6| 4|
|1234| 4|
|4567| 1|
+----+----+
我有一个 "from node" 和 "to node" 的列表,如下所示:
1234 4567
1234 6789
1234 3456
4567 9876
….
重点是要找出哪个节点最显着,也就是哪个节点的一跳和二跳引用最多:1234 with (4567,6789,3456,9876 (因为连着4567))
目前我所做的只是一个 map 和 reduce 函数来获取出现次数最多的节点,该节点将覆盖单个节点引用。但是我需要涵盖 A -> B 和 B -> C 因此 A -> C 的情况。
查找前十个节点的当前代码:
val textFile = sc.textFile("cit-Patents.txt")
val arrayForm = textFile.filter(_.charAt(0)!='#')
val mapreduce = arrayForm.flatMap(line => line.split("\s+")).map(word => (word,1)).reduceByKey(_ + _).sortBy(_._2,ascending=false).take(10);
我知道 graphX 也可以帮助解决这个问题,但我不知道该怎么做。
如果您需要更多信息,请告诉我。 谢谢。
我认为根据您的情况,您不需要 spark-graphx。您的问题只需将您的基础 DataFrame
加入自身即可解决,请查看代码:
假设我们有从 X 到 Y 的直接链接的 DataFrame:
val df = Seq(
(1234, 4567),
(1234, 6789),
(1234, 3456),
(4567, 9876),
(5, 6),
(6, 7),
(6, 8),
(6, 9),
(5, 9),
(6, 10)
).toDF("X", "Y")
我们看到,有些行与另一行具有相同的 Y
值 X
值,这意味着我们可以将 DataFrame 连接到自身(让我们使用 a
和 b
别名)按条件:a.Y
应该等于 b.X
:
import org.apache.spark.sql.functions._
val twoHopCitation = df.as("a").join(
df.as("b"),
col("a.Y") === col("b.X")
)
.select(col("a.X").as("X"), col("b.Y").as("Y"))
现在我们看到了从 a.X
到 b.Y
的所有传递链接:
twoHopCitation.show()
+----+----+
| X| Y|
+----+----+
|1234|9876|
| 5| 10|
| 5| 9|
| 5| 8|
| 5| 7|
+----+----+
所以,我们所需要的就是合并这两个 DataFrame,并按 X
计数 Y
聚合并按 count Y
降序排序:
df.union(
twoHopCitation
)
.groupBy("X")
.agg(count(col("Y")).as("cntY"))
.sort(col("cntY").desc)
.show()
+----+----+
| X|cntY|
+----+----+
| 5| 6|
| 6| 4|
|1234| 4|
|4567| 1|
+----+----+