scala中的一跳和两跳引用

One and two hop citation in scala

我有一个 "from node" 和 "to node" 的列表,如下所示:

1234        4567
1234        6789
1234        3456
4567        9876
….

重点是要找出哪个节点最显着,也就是哪个节点的一跳和二跳引用最多:1234 with (4567,6789,3456,9876 (因为连着4567))

目前我所做的只是一个 map 和 reduce 函数来获取出现次数最多的节点,该节点将覆盖单个节点引用。但是我需要涵盖 A -> B 和 B -> C 因此 A -> C 的情况。

查找前十个节点的当前代码:

val textFile = sc.textFile("cit-Patents.txt")

val arrayForm = textFile.filter(_.charAt(0)!='#')

val mapreduce = arrayForm.flatMap(line => line.split("\s+")).map(word => (word,1)).reduceByKey(_ + _).sortBy(_._2,ascending=false).take(10);

我知道 graphX 也可以帮助解决这个问题,但我不知道该怎么做。

如果您需要更多信息,请告诉我。 谢谢。

我认为根据您的情况,您不需要 spark-graphx。您的问题只需将您的基础 DataFrame 加入自身即可解决,请查看代码:

假设我们有从 X 到 Y 的直接链接的 DataFrame:

val df = Seq(
  (1234, 4567),
  (1234, 6789),
  (1234, 3456),
  (4567, 9876),
  (5, 6),
  (6, 7),
  (6, 8),
  (6, 9),
  (5, 9),
  (6, 10)
).toDF("X", "Y")

我们看到,有些行与另一行具有相同的 YX 值,这意味着我们可以将 DataFrame 连接到自身(让我们使用 ab 别名)按条件:a.Y 应该等于 b.X:

import org.apache.spark.sql.functions._
val twoHopCitation = df.as("a").join(
  df.as("b"), 
  col("a.Y") === col("b.X")
)
  .select(col("a.X").as("X"), col("b.Y").as("Y"))

现在我们看到了从 a.Xb.Y 的所有传递链接:

twoHopCitation.show()
+----+----+
|   X|   Y|
+----+----+
|1234|9876|
|   5|  10|
|   5|   9|
|   5|   8|
|   5|   7|
+----+----+

所以,我们所需要的就是合并这两个 DataFrame,并按 X 计数 Y 聚合并按 count Y 降序排序:

df.union(
  twoHopCitation
)
  .groupBy("X")
  .agg(count(col("Y")).as("cntY"))
  .sort(col("cntY").desc)
  .show()
+----+----+
|   X|cntY|
+----+----+
|   5|   6|
|   6|   4|
|1234|   4|
|4567|   1|
+----+----+