在 Spark 中为每个 Executor 创建数组并组合成 RDD
Creating array per Executor in Spark and combine into RDD
我正在从基于 MPI 的系统转向 Apache Spark。我需要在 Spark 中执行以下操作。
假设我有 n
个顶点。我想从这些 n
个顶点创建一个边列表。边只是两个整数 (u,v) 的元组,不需要任何属性。
但是,我想在每个执行器中独立地并行创建它们。因此,我想为 P
个 Spark Executors 独立创建 P
个边数组。每个数组可能有不同的大小并且取决于顶点,因此,我还需要从 0
到 n-1
的执行器 ID。接下来,我想要一个全局 RDD 边数组。
在 MPI 中,我会使用处理器级别在每个处理器中创建一个数组。我如何在 Spark 中做到这一点,尤其是使用 GraphX
库?
因此,我的主要目标是在每个执行器中创建一个边数组,并将它们组合成一个 RDD。
我首先尝试的是鄂尔多斯的一种改良版——人一模型。作为参数,我只有节点数 n 和概率 p。
假设,执行器i
必须处理从101
到200
的节点。对于任何节点,比如节点 101
,它将以概率 p 创建从 101
到 102 -- n
的边。在每个执行程序创建分配的边之后,我将实例化 GraphX EdgeRDD
和 VertexRDD
。因此,我的计划是在每个执行器中独立创建边缘列表,并将它们合并到 RDD
.
让我们从下游处理所需的一些导入和变量开始:
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import scala.util.Random
import org.apache.spark.HashPartitioner
val nPartitions: Integer = ???
val n: Long = ???
val p: Double = ???
接下来我们需要一个种子 ID 的 RDD,它可以用来生成边。处理这个问题的一种天真的方法就是这样:
sc.parallelize(0L to n)
由于生成的边数取决于节点 ID,因此这种方法会产生高度倾斜的负载。我们可以通过重新分区做得更好:
sc.parallelize(0L to n)
.map((_, None))
.partitionBy(new HashPartitioner(nPartitions))
.keys
但更好的方法是从空 RDD 开始并就地生成 ID。我们需要一个小帮手:
def genNodeIds(nPartitions: Int, n: Long)(i: Int) = {
(0L until n).filter(_ % nPartitions == i).toIterator
}
可以如下使用:
val empty = sc.parallelize(Seq.empty[Int], nPartitions)
val ids = empty.mapPartitionsWithIndex((i, _) => genNodeIds(nPartitions, n)(i))
只是一个快速的健全性检查(它非常昂贵,所以不要在生产中使用它):
require(ids.distinct.count == n)
我们可以使用另一个助手生成实际的边缘:
def genEdgesForId(p: Double, n: Long, random: Random)(i: Long) = {
(i + 1 until n).filter(_ => random.nextDouble < p).map(j => Edge(i, j, ()))
}
def genEdgesForPartition(iter: Iterator[Long]) = {
// It could be an overkill but better safe than sorry
// Depending on your requirement it could worth to
// consider using commons-math
// https://commons.apache.org/proper/commons-math/userguide/random.html
val random = new Random(new java.security.SecureRandom())
iter.flatMap(genEdgesForId(p, n, random))
}
val edges = ids.mapPartitions(genEdgesForPartition)
终于可以创建图表了:
val graph = Graph.fromEdges(edges, ())
我正在从基于 MPI 的系统转向 Apache Spark。我需要在 Spark 中执行以下操作。
假设我有 n
个顶点。我想从这些 n
个顶点创建一个边列表。边只是两个整数 (u,v) 的元组,不需要任何属性。
但是,我想在每个执行器中独立地并行创建它们。因此,我想为 P
个 Spark Executors 独立创建 P
个边数组。每个数组可能有不同的大小并且取决于顶点,因此,我还需要从 0
到 n-1
的执行器 ID。接下来,我想要一个全局 RDD 边数组。
在 MPI 中,我会使用处理器级别在每个处理器中创建一个数组。我如何在 Spark 中做到这一点,尤其是使用 GraphX
库?
因此,我的主要目标是在每个执行器中创建一个边数组,并将它们组合成一个 RDD。
我首先尝试的是鄂尔多斯的一种改良版——人一模型。作为参数,我只有节点数 n 和概率 p。
假设,执行器i
必须处理从101
到200
的节点。对于任何节点,比如节点 101
,它将以概率 p 创建从 101
到 102 -- n
的边。在每个执行程序创建分配的边之后,我将实例化 GraphX EdgeRDD
和 VertexRDD
。因此,我的计划是在每个执行器中独立创建边缘列表,并将它们合并到 RDD
.
让我们从下游处理所需的一些导入和变量开始:
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import scala.util.Random
import org.apache.spark.HashPartitioner
val nPartitions: Integer = ???
val n: Long = ???
val p: Double = ???
接下来我们需要一个种子 ID 的 RDD,它可以用来生成边。处理这个问题的一种天真的方法就是这样:
sc.parallelize(0L to n)
由于生成的边数取决于节点 ID,因此这种方法会产生高度倾斜的负载。我们可以通过重新分区做得更好:
sc.parallelize(0L to n)
.map((_, None))
.partitionBy(new HashPartitioner(nPartitions))
.keys
但更好的方法是从空 RDD 开始并就地生成 ID。我们需要一个小帮手:
def genNodeIds(nPartitions: Int, n: Long)(i: Int) = {
(0L until n).filter(_ % nPartitions == i).toIterator
}
可以如下使用:
val empty = sc.parallelize(Seq.empty[Int], nPartitions)
val ids = empty.mapPartitionsWithIndex((i, _) => genNodeIds(nPartitions, n)(i))
只是一个快速的健全性检查(它非常昂贵,所以不要在生产中使用它):
require(ids.distinct.count == n)
我们可以使用另一个助手生成实际的边缘:
def genEdgesForId(p: Double, n: Long, random: Random)(i: Long) = {
(i + 1 until n).filter(_ => random.nextDouble < p).map(j => Edge(i, j, ()))
}
def genEdgesForPartition(iter: Iterator[Long]) = {
// It could be an overkill but better safe than sorry
// Depending on your requirement it could worth to
// consider using commons-math
// https://commons.apache.org/proper/commons-math/userguide/random.html
val random = new Random(new java.security.SecureRandom())
iter.flatMap(genEdgesForId(p, n, random))
}
val edges = ids.mapPartitions(genEdgesForPartition)
终于可以创建图表了:
val graph = Graph.fromEdges(edges, ())