为 createDirectStream 设置 spark.streaming.kafka.maxRatePerPartition

set spark.streaming.kafka.maxRatePerPartition for createDirectStream

我需要为我的应用程序增加每个分区的输入速率,我已经使用 .set("spark.streaming.kafka.maxRatePerPartition",100) 作为配置。流持续时间为 10 秒,因此我希望处理此批次的 5*100*10=5000 消息。但是,我收到的输入速率只有 500 左右。您能否建议任何修改以提高此速率?

The stream duration is 10s so I expect process 5*100*10=5000 messages for this batch.

设置不是这个意思。意思是"how many elements each partition can have per batch",而不是每秒。我假设你有 5 个分区,所以你得到 5 * 100 = 500。如果你想要 5000,请将 maxRatePerPartition 设置为 1000。

来自 "Exactly-once Spark Streaming From Apache Kafka"(由 Cody 撰写,Direct Stream 方法的作者,强调我的):

For rate limiting, you can use the Spark configuration variable spark.streaming.kafka.maxRatePerPartition to set the maximum number of messages per partition per batch.

编辑:

@avrs 评论后,I looked inside the code which defines the max rate。事实证明,启发式方法比博客 post 和文档中所述的要复杂一些。

有两个分支。如果背压与 maxRate 一起启用,则 maxRate 是 RateEstimator 对象计算的当前背压率与用户设置的 maxRate 之间的最小值。如果未启用,则采用原样定义的 maxRate。

现在,在选择速率后,它 总是乘以总批次秒数 ,有效地使其成为每秒速率:

if (effectiveRateLimitPerPartition.values.sum > 0) {
  val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
  Some(effectiveRateLimitPerPartition.map {
    case (tp, limit) => tp -> (secsPerBatch * limit).toLong
  })
} else {
  None
}

属性 每秒从分区中获取 N 条消息。如果我有 M 个分区并且批次间隔是 B,那么我可以在批次中看到的消息总数是 N * M * B.

有几件事你应该验证

  1. 您的输入速率是否 >500 持续 10 秒。
  2. kafka主题是否正确分区。