Spark Graphx:从邻接矩阵加载图形

Spark Graphx: Loading a graph from adjacency matrix

我一直在试验 Spark 的 Graphx API,主要是为了学习和感受如何使用它们。在此过程中,我必须将邻接矩阵加载到图中。矩阵数据集为here.

根据网站,矩阵被描述为


A number of employees in a factory was interviewed on a question: “Do you like to work with your co-worker?”. Possible answers are 1 for yes and 0 for no. Each employee gave an answer for each other employee thus creating an adjecancy matrix.


所以,我决定将员工命名为英文字母("A"以后)。员工构成图的节点,他们对同事的偏好构成边。我还没有在 Spark 中找到任何直接的方法来实现这一点;我的 R 程序员朋友告诉我,在他们的世界里这样做很容易。因此,我着手编写一个简单的实现来这样做。这是代码

val conf = new SparkConf().setMaster("local[*]").setAppName("GraphExploration App")
val spark = SparkSession
  .builder()
  .appName("Spark SQL: beginners exercise")
  .getOrCreate()     

    val sc = SparkContext.getOrCreate(conf)

      val df = spark.read.csv("./BlogInputs/sociogram-employees-un.csv").cache

      val allRows = df.toLocalIterator.toIndexedSeq

      type EmployeeVertex = (Long,String)

      val employeesWithNames = (0 until allRows.length).map(i => (i.toLong,((i + 'A').toChar.toString())))

      val columnNames   = (0 until allRows.length).map(i => ("_c" + i)).toIndexedSeq // It is a square matrix; rows == columns

      val edgesAsCollected = (for {
            rowIndex <- 0 until df.count.toInt
            colIndex <- 0 until df.count.toInt
            if (rowIndex != colIndex)
            } yield {

                    if (allRows(rowIndex).fieldIndex(columnNames(colIndex)) == 1)
                        Some(Edge(employeesWithNames(rowIndex)._1,employeesWithNames(colIndex)._1,"Likes"))
                    else
                       None

            }).flatten

       val employeeNodes = sc.parallelize(employeesWithNames)
       val edges = sc.parallelize(edgesAsCollected)

       val employeeGraph = Graph(sc.parallelize(employeesWithNames),edges,"Nobody")

架构如下:

scala>df.printSchema
root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: string (nullable = true)
 |-- _c20: string (nullable = true)
 |-- _c21: string (nullable = true)
 |-- _c22: string (nullable = true)
 |-- _c23: string (nullable = true)
 |-- _c24: string (nullable = true)

..这里是前几行

scala> df.show
16/12/21 07:12:00 WARN Executor: 1 block locks were not released by TID = 1:
[rdd_8_0]
+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|_c0|_c1|_c2|_c3|_c4|_c5|_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|_c21|_c22|_c23|_c24|
+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|  0|  1|  0|  1|  1|  0|  1|  1|  1|  0|   0|   1|   0|   1|   1|   0|   1|   1|   0|   1|   0|   1|   0|   1|   1|
|  1|  0|  0|  1|  0|  0|  1|  0|  1|  0|   0|   1|   0|   0|   1|   0|   1|   0|   1|   0|   0|   1|   0|   1|   0|
|  0|  1|  0|  1|  1|  0|  0|  0|  1|  0|   0|   0|   0|   1|   1|   0|   0|   1|   0|   0|   0|   1|   1|   0|   1|
|  0|  1|  1|  0|  0|  0|  1|  0|  0|  0|   1|   1|   0|   1|   0|   0|   1|   1|   0|   0|   1|   0|   1|   1|   0|

这符合我的目的,但我觉得可能有不同的方式。我对 Spark 的 MLLib API 知之甚少可能是一个障碍。有人可以对此发表评论吗?更好的是,有人可以告诉我更好但更简单的方法(如有必要,通过编辑我的代码)吗?

对于手头的案例,我认为@DanieldePaula 的建议可以作为答案:

As the matrix is square, a very large number of rows would imply a very large number of columns, in which case using SparkSQL wouldn't seem optimal in my opinion. I think you can use Spark for this problem if the matrix is converted into a Sparse format, e.g. RDD[(row, col, value)], then it would be very easy to create your vertices and edges.

谢谢,丹尼尔!