为 createDirectStream 设置 spark.streaming.kafka.maxRatePerPartition
set spark.streaming.kafka.maxRatePerPartition for createDirectStream
我需要为我的应用程序增加每个分区的输入速率,我已经使用 .set("spark.streaming.kafka.maxRatePerPartition",100)
作为配置。流持续时间为 10 秒,因此我希望处理此批次的 5*100*10=5000
消息。但是,我收到的输入速率只有 500 左右。您能否建议任何修改以提高此速率?
The stream duration is 10s so I expect process 5*100*10=5000 messages
for this batch.
设置不是这个意思。意思是"how many elements each partition can have per batch",而不是每秒。我假设你有 5 个分区,所以你得到 5 * 100 = 500。如果你想要 5000,请将 maxRatePerPartition
设置为 1000。
来自 "Exactly-once Spark Streaming From Apache Kafka"(由 Cody 撰写,Direct Stream 方法的作者,强调我的):
For rate limiting, you can use the Spark configuration variable
spark.streaming.kafka.maxRatePerPartition
to set the maximum number of
messages per partition per batch.
编辑:
@avrs 评论后,I looked inside the code which defines the max rate。事实证明,启发式方法比博客 post 和文档中所述的要复杂一些。
有两个分支。如果背压与 maxRate 一起启用,则 maxRate 是 RateEstimator
对象计算的当前背压率与用户设置的 maxRate 之间的最小值。如果未启用,则采用原样定义的 maxRate。
现在,在选择速率后,它 总是乘以总批次秒数 ,有效地使其成为每秒速率:
if (effectiveRateLimitPerPartition.values.sum > 0) {
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
Some(effectiveRateLimitPerPartition.map {
case (tp, limit) => tp -> (secsPerBatch * limit).toLong
})
} else {
None
}
属性 每秒从分区中获取 N 条消息。如果我有 M 个分区并且批次间隔是 B,那么我可以在批次中看到的消息总数是 N * M * B.
有几件事你应该验证
- 您的输入速率是否 >500 持续 10 秒。
- kafka主题是否正确分区。
我需要为我的应用程序增加每个分区的输入速率,我已经使用 .set("spark.streaming.kafka.maxRatePerPartition",100)
作为配置。流持续时间为 10 秒,因此我希望处理此批次的 5*100*10=5000
消息。但是,我收到的输入速率只有 500 左右。您能否建议任何修改以提高此速率?
The stream duration is 10s so I expect process 5*100*10=5000 messages for this batch.
设置不是这个意思。意思是"how many elements each partition can have per batch",而不是每秒。我假设你有 5 个分区,所以你得到 5 * 100 = 500。如果你想要 5000,请将 maxRatePerPartition
设置为 1000。
来自 "Exactly-once Spark Streaming From Apache Kafka"(由 Cody 撰写,Direct Stream 方法的作者,强调我的):
For rate limiting, you can use the Spark configuration variable
spark.streaming.kafka.maxRatePerPartition
to set the maximum number of messages per partition per batch.
编辑:
@avrs 评论后,I looked inside the code which defines the max rate。事实证明,启发式方法比博客 post 和文档中所述的要复杂一些。
有两个分支。如果背压与 maxRate 一起启用,则 maxRate 是 RateEstimator
对象计算的当前背压率与用户设置的 maxRate 之间的最小值。如果未启用,则采用原样定义的 maxRate。
现在,在选择速率后,它 总是乘以总批次秒数 ,有效地使其成为每秒速率:
if (effectiveRateLimitPerPartition.values.sum > 0) {
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
Some(effectiveRateLimitPerPartition.map {
case (tp, limit) => tp -> (secsPerBatch * limit).toLong
})
} else {
None
}
属性 每秒从分区中获取 N 条消息。如果我有 M 个分区并且批次间隔是 B,那么我可以在批次中看到的消息总数是 N * M * B.
有几件事你应该验证
- 您的输入速率是否 >500 持续 10 秒。
- kafka主题是否正确分区。