reduceByKey 的 numPartitions 不影响速度

numPartitions for reduceByKey not affecting speed

我正在接收时间序列数据,并且只想 persist/upsert 我们根据特定键获得的最新条目。我们过去常常使用 mapWithState 聚合事物,我们能够在我们的 qa 环境中以大约 6k/秒的速度在本地处理大约 1k/秒,在我们最强大的环境中以大约 45k/秒的速度处理。

我删除了很多代码,因为我们有一些需求更改,我认为我看到如此缓慢的行为的原因是 reduceByKey 我有少量代码:

rawStream
    .map(timeSeries => (timeSeries.key, AggregationEngine.createSingleItemAggregate(timeSeries)))
    // Select the last value for the key
    .reduceByKey((accumulator: AggregationResult, aggregationResult: AggregationResult) => {
        if (accumulator.firstTimeMillis > aggregationResult.firstTimeMillis) {
            accumulator
        } else {
            aggregationResult
        }
    }, numPartitions = numberOfReduceTasks)
    // Send back table name and aggregate
    .map(aggResultPair => (aggResultPair._2.tableName, aggResultPair._2) )

本地处理 500 个数据点大约需要 3-5 分钟,而且在我们的 qa 环境中处理小批量时速度相当慢。我理解它应该有一个慢下来,因为之前一切都是一个阶段,现在因为洗牌,它分成两个阶段,洗牌需要很长时间。我应该使用一个理想的 numPartitions 值吗?就像每个核心应该添加 X 个分区,或者每 GB 的内存你应该添加 X 个以上的分区。我一直在本地 运行,并试图弄清楚,但没有什么能真正让我为此获得合理的处理时间。

我在一个 RDD 中有大约 2000 个项目的小型集群上使用 Spark 时也有过类似的经历。重新分区到许多不同的分区计数并没有什么不同。一旦我 运行 它有更多的项目(大约 4000,但我认为这取决于你拥有的执行者的数量),它开始按预期运行。使用更多数据点尝试 运行。