为什么 Spark Streaming 中 2 核和 4 核以及不同分区的处理时间相同?

Why is the processing time the same for 2 and 4 cores and different partitions in Spark Streaming?

我正在尝试 运行 一些关于 Spark 流应用程序处理时间的测试,在我的 4 核机器上以本地模式进行。

这是我的代码:

    SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("sparkstreaminggetjson");

    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));


    JavaReceiverInputDStream<String> streamData1 = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
            StorageLevels.MEMORY_AND_DISK_SER);

    streamData1.print();

我每秒收到 1 JSON 条消息。 所以,我针对 4 种不同的场景进行了测试:

1) setMaster(...local[2]) 和 1 个分区

2) setMaster(...local[*]) 和 1 个分区

3)setMaster(...local[2]) 和 4 个分区(使用 streamData1.repartition(4)

4) setMaster(...local[*]) 和 4 个分区(使用 streamData1.repartition(4)

当我检查 UI 中的平均处理时间时,这是我针对每个场景得到的结果:

1) 30 毫秒

2) 28 毫秒

3) 72 毫秒

4) 75 毫秒

我的问题是:为什么 1 和 2、3 和 4 的处理时间几乎相同? 我意识到例如从 2 增加到 4 是正常的,因为重新分区是一个洗牌操作。我不明白的是,例如在 4) 中,为什么处理与 3 如此相似?它不应该更小吗,因为我正在提高并行化水平,并且我有更多的核心来分配任务?

希望我没有混淆, 非常感谢您。

其中一些取决于您的 JSON 消息的外观,我假设每条消息都是一个没有换行符的字符串。在这种情况下,每秒 1 条消息和 1 秒的批次间隔,在每个批次中你将得到一个只有一个项目的 RDD。你不能把它分成多个分区,所以当你重新分区时,你在数据方面仍然有相同的情况,但是有重新分区步骤的开销。

即使数据量更大,当您对数据所做的所有操作都是 print() 时,我也不希望有太大差异:这将获取数据的前 10 项,如果它们可以的话从一个分区开始,我希望 Spark 对其进行优化以仅计算该分区。在任何情况下,如果您显着增加每批数据的数量,并对整个数据集进行一些实际处理,您将获得更具代表性的数字,至少像 streamData1.count().print().

为了更好地理解发生了什么,深入研究 Spark UI 的其他部分也很有用,例如 Stages 选项卡可以告诉您有多少执行时间是洗牌、序列化等,而不是实际执行,以及影响性能的事物,例如 DAG 告诉您哪些位可能被缓存,以及 Spark 能够跳过的任务。