许多跳过 Spark 中 Pregel 的阶段 UI
Many skipped stages for Pregel in Spark UI
我尝试 运行 connected components
logNormalGraph
。
val graph: Graph[Long, Int] = GraphGenerators.
logNormalGraph(context.spark, numEParts = 10, numVertices = 1000000,
mu = 0.01, sigma = 0.01)
val minGraph = graph.connectedComponents()
在下一份工作的火花 UI 中,我可以看到跳过阶段的数量不断增加
1 - 4/4 (12 skipped)
2 - 4/4 (23 skipped)
...
50 - 4/4 (4079 skipped)
为什么我在 Pregel 上 运行 时跳过了这么多阶段,为什么这个数字增长得如此之快(非线性)?
循序渐进。 connectedComponents
函数 is implemented using Pregel API。迭代地忽略算法的具体细节:
joinVertices
caching the output
mapReduceTriplets
超过 messages
首先让我们创建虚拟 sendMsg
:
import org.apache.spark.graphx._
def sendMsg(edge: EdgeTriplet[VertexId, Int]):
Iterator[(VertexId, VertexId)] = {
Iterator((edge.dstId, edge.srcAttr))
}
vprog
:
val vprog = (id: Long, attr: Long, msg: Long) => math.min(attr, msg)
和megeMsg
:
val mergeMsg = (a: Long, b: Long) => math.min(a, b)
接下来我们可以initialize示例图:
import org.apache.spark.graphx.util.GraphGenerators
val graph = GraphGenerators.logNormalGraph(
sc, numEParts = 10, numVertices = 100, mu = 0.01, sigma = 0.01)
.mapVertices { case (vid, _) => vid }
val g0 = graph
.mapVertices((vid, vdata) => vprog(vid, vdata, Long.MaxValue))
.cache()
和messages:
val messages0 = g0.mapReduceTriplets(sendMsg, mergeMsg).cache()
由于 GraphXUtils
是私有的,我们必须直接使用 Graph
方法。
当你看一下
生成的DAG
messages0.count
您已经看到了一些跳过的阶段:
执行第一个后iteration
val g1 = g0.joinVertices(messages0)(vprog).cache()
val messages1 = g1.mapReduceTriplets(sendMsg, mergeMsg).cache()
messages1.count
图形大致如下所示:
如果我们继续:
val g2 = g1.joinVertices(messages1)(vprog).cache()
val messages2 = g2.mapReduceTriplets(sendMsg, mergeMsg).cache()
messages2.count
我们得到以下 DAG:
所以这里发生了什么:
- 我们执行迭代算法,该算法两次依赖于相同的数据,一次用于连接,一次用于消息聚合。这导致
g
在每次迭代中依赖的阶段数量增加
- 由于数据被密集缓存(正如您在代码中看到的那样,明确地通过持久化洗牌文件显式地缓存)和检查点(我在这里可能是错的,但检查点通常标记为绿点)必须计算每个阶段仅一次,即使多个下游阶段依赖于它。
- 数据初始化后(
g0
、messages0
)仅从头开始计算最新阶段。
- 如果您仔细观察
DAG
,您会发现存在相当复杂的依赖关系,这应该可以解释 DAG 相对缓慢的增长与跳过的阶段数量之间的剩余差异。
第一个 属性 解释阶段数量的增加,第二个解释阶段被跳过的事实。
我尝试 运行 connected components
logNormalGraph
。
val graph: Graph[Long, Int] = GraphGenerators.
logNormalGraph(context.spark, numEParts = 10, numVertices = 1000000,
mu = 0.01, sigma = 0.01)
val minGraph = graph.connectedComponents()
在下一份工作的火花 UI 中,我可以看到跳过阶段的数量不断增加
1 - 4/4 (12 skipped)
2 - 4/4 (23 skipped)
...
50 - 4/4 (4079 skipped)
为什么我在 Pregel 上 运行 时跳过了这么多阶段,为什么这个数字增长得如此之快(非线性)?
循序渐进。 connectedComponents
函数 is implemented using Pregel API。迭代地忽略算法的具体细节:
joinVertices
caching the outputmapReduceTriplets
超过messages
首先让我们创建虚拟 sendMsg
:
import org.apache.spark.graphx._
def sendMsg(edge: EdgeTriplet[VertexId, Int]):
Iterator[(VertexId, VertexId)] = {
Iterator((edge.dstId, edge.srcAttr))
}
vprog
:
val vprog = (id: Long, attr: Long, msg: Long) => math.min(attr, msg)
和megeMsg
:
val mergeMsg = (a: Long, b: Long) => math.min(a, b)
接下来我们可以initialize示例图:
import org.apache.spark.graphx.util.GraphGenerators
val graph = GraphGenerators.logNormalGraph(
sc, numEParts = 10, numVertices = 100, mu = 0.01, sigma = 0.01)
.mapVertices { case (vid, _) => vid }
val g0 = graph
.mapVertices((vid, vdata) => vprog(vid, vdata, Long.MaxValue))
.cache()
和messages:
val messages0 = g0.mapReduceTriplets(sendMsg, mergeMsg).cache()
由于 GraphXUtils
是私有的,我们必须直接使用 Graph
方法。
当你看一下
生成的DAGmessages0.count
您已经看到了一些跳过的阶段:
执行第一个后iteration
val g1 = g0.joinVertices(messages0)(vprog).cache()
val messages1 = g1.mapReduceTriplets(sendMsg, mergeMsg).cache()
messages1.count
图形大致如下所示:
如果我们继续:
val g2 = g1.joinVertices(messages1)(vprog).cache()
val messages2 = g2.mapReduceTriplets(sendMsg, mergeMsg).cache()
messages2.count
我们得到以下 DAG:
所以这里发生了什么:
- 我们执行迭代算法,该算法两次依赖于相同的数据,一次用于连接,一次用于消息聚合。这导致
g
在每次迭代中依赖的阶段数量增加 - 由于数据被密集缓存(正如您在代码中看到的那样,明确地通过持久化洗牌文件显式地缓存)和检查点(我在这里可能是错的,但检查点通常标记为绿点)必须计算每个阶段仅一次,即使多个下游阶段依赖于它。
- 数据初始化后(
g0
、messages0
)仅从头开始计算最新阶段。 - 如果您仔细观察
DAG
,您会发现存在相当复杂的依赖关系,这应该可以解释 DAG 相对缓慢的增长与跳过的阶段数量之间的剩余差异。
第一个 属性 解释阶段数量的增加,第二个解释阶段被跳过的事实。