在Spark中将循环分发到集群的不同机器

Distributing a loop to different machines of a cluster in Spark

这是我在代码中 运行 的 for 循环:

 for(x<-0 to vertexArray.length-1)
  {
    for(y<-0 to vertexArray.length-1)
      {
        breakable {

          if (x.equals(y)) {
            break
          }
          else {
            var d1 = vertexArray(x)._2._2
            var d2 = vertexArray(y)._2._2
            val ps = new Period(d1, d2)

            if (ps.getMonths() == 0 && ps.getYears() == 0 && Math.abs(ps.toStandardHours().getHours()) <= 5) {
              edgeArray += Edge(vertexArray(x)._1, vertexArray(y)._1, Math.abs(ps.toStandardHours().getHours()))
            }
          }
        }
      }
  }

我想通过将此代码分布到集群中的多台计算机来加快 运行 的运行速度。我在带有 Spark 的 intelliJ-idea 上使用 Scala。我将如何实现这种类型的代码以在多台机器上工作?

欢迎来到 Stack Overflow。不幸的是,这不是正确的方法;(

Spark 不是并行化任务的工具,而是并行化数据的工具。

因此您需要考虑如何 distribute/parallelize/partition 您的数据,然后计算各个分区,最后合并结果。

此外,您还需要阅读有关 Spark 的一般知识。这里的简单答案无法让您入门。这只是错误的格式。

从这里开始:http://spark.apache.org/docs/latest/programming-guide.html

正如 Mariano Kamp 所说,Spark 在这里可能不是一个好的选择,还有更好的选择。除此之外,任何必须处理相对较大的数据并需要 O(N^2) 时间的方法都是不可接受的。所以你应该做的第一件事是专注于选择合适的算法而不是平台。

仍然可以将其转换为 Spark。直接反映您的代码的一种天真的方法是使用笛卡尔积:

def check(v1: T, v2: T): Option[U] = {
  if (v1 == v2) {
    None
  } else {
    // rest of your logic, Some[U] if all tests passed
    // None otherwise
    ???
  }
}

val vertexRDD = sc.parallelize(vertexArray)
  .map{case (v1, v2) => check(v1, 2)}
  .filter(_.isDefined)
  .map(_.get)

如果 vertexArray 很小,您可以将 flatMap 与广播变量一起使用

val vertexBd = sc.broadcast(vertexArray)

vertexRDD.flatMap(v1 =>
  vertexBd.map(v2 => check(v1, v2)).filter(_.isDefined).map(_.get))
)

另一个改进是执行正确的连接。明显的条件是年月:

def toPair(v: T): ((Int, Int), T) = ??? // Return ((year, month), vertex)

val vertexPairs = vertexRDD.map(toPair)

vertexPairs.join(vertexPairs)
  .map{case ((_, _), (v1, v2)) => check(v1, v2) // Check should be simplified
  .filter(_.isDefined)
  .map(_.get)

当然这也可以通过广播变量来实现。您只需按(年、月)对分组 vertexArray 并广播 Map[(Int, Int), T].

从这里您可以通过避免按分区进行简单检查和遍历按时间戳排序的数据来进一步改进:

def sortPartitionByDatetime(iter: Iterator[U]): Iterator[U] = ???
def yieldMatching(iter: Iterator[U]): Iterator[V] = {
  // flatmap keeping track of values in open window
  ???
}

vertexPairs
  .partitionBy(new HashPartitioner(n))
  .mapPartitions(sortPartitionByDatetime)
  .mapPartitions(yieldMatching)

或使用带有 window 函数和范围子句的 DataFrame。

注:

所有类型都只是占位符。以后请尽量提供类型信息。现在我只能说涉及一些元组和日期