给定惰性执行风格,如何在 Spark 中计时转换?

How to time a transformation in Spark, given lazy execution style?

我有一些代码执行了很多步骤,我知道整个过程需要多长时间。但是,我希望能够计算出每个单独的转换需要多长时间。以下是一些简单的步骤示例:

rdd1 = sc.textFile("my/filepath/*").map(lambda x: x.split(","))
rdd2 = sc.textFile("other/filepath/*").map(lambda x: x.split(","))
to_kv1 = rdd1.map(lambda x: (x[0], x[1])) # x[0] is key, x[1] is numeric
to_kv2 = rdd2.map(lambda x: (x[0], x[1])) # x[0] is key, x[1] is numeric
reduced1 = to_kv1.reduceByKey(lambda a, b: a+b)
reduced2 = to_kv1.reduceByKey(lambda a, b: a+b)
outer_joined = reduced1.fullOuterJoin(reduced2) # let's just assume there is key overlap
outer_joined.saveAsTextFile("my_output")

现在:如何对这段代码的特定部分进行基准测试?我知道 运行 端到端连接它需要一定的时间(saveAsTextFile 会强制它执行),但是我如何只对 reduceByKey 或 [=13= 进行基准测试] 部分代码?我知道我可以在每次操作后 运行 count() 强制执行,但这不会正确地对操作进行基准测试,因为它增加了执行 count 所需的时间以及时间执行转换。

考虑到 Spark 转换的惰性执行风格,最好的基准测试方法是什么?

请注意,我不是在问如何测量时间。我知道 time 模块,start = time.time() 等。我问的是如何在给定 Spark 转换的惰性执行风格的情况下进行基准测试,直到您调用需要返回信息的操作后才会执行driver.

最好的办法是使用 Spark UI 来阅读此信息。问题有两个:

  • 计算是分布式的,因此即使您在每个转换中添加了计时机制,也很难判断任务何时真正完成,因为它可以在一台机器上完成,但不能在另一台机器上完成。也就是说,您可以在内部添加日志记录并找到执行的第一个实例,然后找到最终执行。不过请记住下一点
  • 转换尽可能流水线化。因此,Spark 将 运行 同时进行多个转换以提高效率,因此您必须明确地测试一个操作。