许多跳过 Spark 中 Pregel 的阶段 UI

Many skipped stages for Pregel in Spark UI

我尝试 运行 connected components logNormalGraph

val graph: Graph[Long, Int] = GraphGenerators.
    logNormalGraph(context.spark, numEParts = 10, numVertices = 1000000,
        mu = 0.01, sigma = 0.01)

val minGraph = graph.connectedComponents()

在下一份工作的火花 UI 中,我可以看到跳过阶段的数量不断增加

1 - 4/4 (12 skipped)
2 - 4/4 (23 skipped)
...
50 - 4/4 (4079 skipped)

为什么我在 Pregel 上 运行 时跳过了这么多阶段,为什么这个数字增长得如此之快(非线性)?

循序渐进。 connectedComponents 函数 is implemented using Pregel API。迭代地忽略算法的具体细节:

首先让我们创建虚拟 sendMsg:

import org.apache.spark.graphx._

def sendMsg(edge: EdgeTriplet[VertexId, Int]): 
    Iterator[(VertexId, VertexId)] = {
  Iterator((edge.dstId, edge.srcAttr))
}

vprog:

val vprog =  (id: Long, attr: Long, msg: Long) => math.min(attr, msg)

megeMsg:

val mergeMsg = (a: Long, b: Long) => math.min(a, b)

接下来我们可以initialize示例图:

import org.apache.spark.graphx.util.GraphGenerators

val graph = GraphGenerators.logNormalGraph(
   sc, numEParts = 10, numVertices = 100,  mu = 0.01, sigma = 0.01)
  .mapVertices { case (vid, _) => vid }

val g0 = graph
  .mapVertices((vid, vdata) => vprog(vid, vdata, Long.MaxValue))
  .cache()

messages:

val messages0 = g0.mapReduceTriplets(sendMsg, mergeMsg).cache()

由于 GraphXUtils 是私有的,我们必须直接使用 Graph 方法。

当你看一下

生成的DAG
messages0.count

您已经看到了一些跳过的阶段:

执行第一个后iteration

val g1 = g0.joinVertices(messages0)(vprog).cache()
val messages1 = g1.mapReduceTriplets(sendMsg, mergeMsg).cache()
messages1.count

图形大致如下所示:

如果我们继续:

val g2 = g1.joinVertices(messages1)(vprog).cache()
val messages2 = g2.mapReduceTriplets(sendMsg, mergeMsg).cache()
messages2.count

我们得到以下 DAG:

所以这里发生了什么:

  • 我们执行迭代算法,该算法两次依赖于相同的数据,一次用于连接,一次用于消息聚合。这导致 g 在每次迭代中依赖的阶段数量增加
  • 由于数据被密集缓存(正如您在代码中看到的那样,明确地通过持久化洗牌文件显式地缓存)和检查点(我在这里可能是错的,但检查点通常标记为绿点)必须计算每个阶段仅一次,即使多个下游阶段依赖于它。
  • 数据初始化后(g0messages0)仅从头开始计算最新阶段。
  • 如果您仔细观察 DAG,您会发现存在相当复杂的依赖关系,这应该可以解释 DAG 相对缓慢的增长与跳过的阶段数量之间的剩余差异。

第一个 属性 解释阶段数量的增加,第二个解释阶段被跳过的事实。