使用 Spark 从顶点创建边

Create Edges from Vertices with Spark

假设我有一个顶点数组,我想以每个顶点连接到下一个 x 顶点的方式从它们创建边。 x 可以有任何整数值。 有没有办法用 Spark 做到这一点?

这是我目前使用 Scala 的结果:

//array that holds the edges
    var edges = Array.empty[Edge[Double]]
    for(j <- 0 to vertices.size - 2) {
      for(i <- 1 to x) {
        if((j+i) < vertices.size) {
          //add edge
          edges = edges ++ Array(Edge(vertices(j)._1, vertices(j+i)._1, 1.0))
          //add inverse edge, we want both directions
          edges = edges ++ Array(Edge(vertices(j+i)._1, vertices(j)._1, 1.0))
        }
      }
    }

其中 vertices 变量是一个 (Long, String) 数组。但是整个过程当然是有顺序的。

编辑:

例如,如果我有这样的顶点:HelloWorldandPlanet cosmos。我需要以下边:Hello -> WorldWorld -> HelloHello -> andand -> HelloHello -> PlanetPlanet -> HelloWorld -> andand -> WorldWorld -> PlanetPlanet -> WorldWorld -> cosmoscosmos -> World 等。

你的意思是这样的吗?

// Add dummy vertices at the end (assumes that you don't use negative ids)
(vertices ++ Array.fill(n)((-1L, null))) 
  .sliding(n + 1) // Slide over n + 1 vertices at the time
  .flatMap(arr => { 
     val (srcId, _) = arr.head // Take first
     // Generate 2n edges
     arr.tail.flatMap{case (dstId, _) => 
       Array(Edge(srcId, dstId, 1.0), Edge(dstId, srcId, 1.0))
     }}.filter(e => e.srcId != -1L & e.dstId != -1L)) // Drop dummies
  .toArray

如果你想 运行 它在 RDD 上,你只需像这样调整初始步骤:

import org.apache.spark.mllib.rdd.RDDFunctions._

val nPartitions = vertices.partitions.size - 1

vertices.mapPartitionsWithIndex((i, iter) =>
  if (i == nPartitions) (iter ++ Array.fill(n)((-1L, null))).toIterator
  else iter)

当然还有 toArray。如果你想要循环连接(尾部连接到头部),你可以用 vertices.take(n) 替换 Array.fill(n)((-1L, null)) 并删除 filter.

所以,我认为这会满足您的需求:

首先,我定义了一个小辅助函数(请注意,我已将此处的边数据设置为顶点名称,以便更容易进行视觉检查):

def pairwiseEdges(list: List[(Long, String)]): List[Edge[String]] = {
  list match {
    case x :: xs => xs.flatMap(i => List(Edge(x._1, i._1, x._2 + "--" + i._2), Edge(i._1, x._1, i._2 + "--" + x._2))) ++ pairwiseEdges(xs)
    case Nil => List.empty
  }
}

我在你的数组上做了一个zipWithIndex得到一个键,然后把数组转换成一个RDD:

val vertices = List((1L,"hello"), (2L,"world"), (3L,"and"), (4L, "planet"), (5L,"cosmos")).toArray
val indexedVertices = vertices.zipWithIndex
val rdd = sc.parallelize(indexedVertices)

然后用x=3生成边:

val edges = rdd
  .flatMap{case((vertexId, name), index) => for {i <- 0 to 3; if (index - i) >= 0} yield ((index - i, (vertexId, name)))}
  .groupByKey()
  .flatMap{case(index, iterable) => pairwiseEdges(iterable.toList)}
  .distinct()

编辑:根据@zero323 在评论中的建议,重写了 flatmap 并删除了 filter

这将生成以下输出:

Edge(1,2,hello--world))
Edge(1,3,hello--and))
Edge(1,4,hello--planet)

Edge(2,1,world--hello)
Edge(2,3,world--and)
Edge(2,4,world--planet)
Edge(2,5,world--cosmos)

Edge(3,1,and--hello)
Edge(3,2,and--world)
Edge(3,4,and--planet)
Edge(3,5,and--cosmos)

Edge(4,1,planet--hello)
Edge(4,2,planet--world)
Edge(4,3,planet--and)
Edge(4,5,planet--cosmos)

Edge(5,2,cosmos--world)
Edge(5,3,cosmos--and)
Edge(5,4,cosmos--planet)