reduceByKey 的 numPartitions 不影响速度
numPartitions for reduceByKey not affecting speed
我正在接收时间序列数据,并且只想 persist/upsert 我们根据特定键获得的最新条目。我们过去常常使用 mapWithState
聚合事物,我们能够在我们的 qa 环境中以大约 6k/秒的速度在本地处理大约 1k/秒,在我们最强大的环境中以大约 45k/秒的速度处理。
我删除了很多代码,因为我们有一些需求更改,我认为我看到如此缓慢的行为的原因是 reduceByKey
我有少量代码:
rawStream
.map(timeSeries => (timeSeries.key, AggregationEngine.createSingleItemAggregate(timeSeries)))
// Select the last value for the key
.reduceByKey((accumulator: AggregationResult, aggregationResult: AggregationResult) => {
if (accumulator.firstTimeMillis > aggregationResult.firstTimeMillis) {
accumulator
} else {
aggregationResult
}
}, numPartitions = numberOfReduceTasks)
// Send back table name and aggregate
.map(aggResultPair => (aggResultPair._2.tableName, aggResultPair._2) )
本地处理 500 个数据点大约需要 3-5 分钟,而且在我们的 qa 环境中处理小批量时速度相当慢。我理解它应该有一个慢下来,因为之前一切都是一个阶段,现在因为洗牌,它分成两个阶段,洗牌需要很长时间。我应该使用一个理想的 numPartitions
值吗?就像每个核心应该添加 X 个分区,或者每 GB 的内存你应该添加 X 个以上的分区。我一直在本地 运行,并试图弄清楚,但没有什么能真正让我为此获得合理的处理时间。
我在一个 RDD 中有大约 2000 个项目的小型集群上使用 Spark 时也有过类似的经历。重新分区到许多不同的分区计数并没有什么不同。一旦我 运行 它有更多的项目(大约 4000,但我认为这取决于你拥有的执行者的数量),它开始按预期运行。使用更多数据点尝试 运行。
我正在接收时间序列数据,并且只想 persist/upsert 我们根据特定键获得的最新条目。我们过去常常使用 mapWithState
聚合事物,我们能够在我们的 qa 环境中以大约 6k/秒的速度在本地处理大约 1k/秒,在我们最强大的环境中以大约 45k/秒的速度处理。
我删除了很多代码,因为我们有一些需求更改,我认为我看到如此缓慢的行为的原因是 reduceByKey
我有少量代码:
rawStream
.map(timeSeries => (timeSeries.key, AggregationEngine.createSingleItemAggregate(timeSeries)))
// Select the last value for the key
.reduceByKey((accumulator: AggregationResult, aggregationResult: AggregationResult) => {
if (accumulator.firstTimeMillis > aggregationResult.firstTimeMillis) {
accumulator
} else {
aggregationResult
}
}, numPartitions = numberOfReduceTasks)
// Send back table name and aggregate
.map(aggResultPair => (aggResultPair._2.tableName, aggResultPair._2) )
本地处理 500 个数据点大约需要 3-5 分钟,而且在我们的 qa 环境中处理小批量时速度相当慢。我理解它应该有一个慢下来,因为之前一切都是一个阶段,现在因为洗牌,它分成两个阶段,洗牌需要很长时间。我应该使用一个理想的 numPartitions
值吗?就像每个核心应该添加 X 个分区,或者每 GB 的内存你应该添加 X 个以上的分区。我一直在本地 运行,并试图弄清楚,但没有什么能真正让我为此获得合理的处理时间。
我在一个 RDD 中有大约 2000 个项目的小型集群上使用 Spark 时也有过类似的经历。重新分区到许多不同的分区计数并没有什么不同。一旦我 运行 它有更多的项目(大约 4000,但我认为这取决于你拥有的执行者的数量),它开始按预期运行。使用更多数据点尝试 运行。