使用 Spark Streaming 时限制 Kafka 批处理大小

Limit Kafka batches size when using Spark Streaming

是否可以限制 Kafka 消费者为 Spark Streaming 返回的批次大小?

我问是因为我得到的第一批有数亿条记录,处理和检查它们需要很长时间。

我认为您的问题可以通过 Spark Streaming Backpressure.

解决

检查 spark.streaming.backpressure.enabledspark.streaming.backpressure.initialRate

默认情况下 spark.streaming.backpressure.initialRate 未设置 并且 spark.streaming.backpressure.enabled 默认情况下 禁用 所以我想 spark 会能拿多少就拿多少。

来自 Apache Spark Kafka configuration

spark.streaming.backpressure.enabled:

This enables the Spark Streaming to control the receiving rate based on the current batch scheduling delays and processing times so that the system receives only as fast as the system can process. Internally, this dynamically sets the maximum receiving rate of receivers. This rate is upper bounded by the values spark.streaming.receiver.maxRate and spark.streaming.kafka.maxRatePerPartition if they are set (see below).

并且由于您想控制第一批,或者更具体地说 - 第一批中的消息数量,我认为您需要 spark.streaming.backpressure.initialRate

spark.streaming.backpressure.initialRate:

This is the initial maximum receiving rate at which each receiver will receive data for the first batch when the backpressure mechanism is enabled.

当你的 Spark 工作(分别是 Spark 工作人员)能够处理来自 kafka 的 10000 条消息时,这个很好,但是 kafka 经纪人给你的工作 100000 条消息。

也许您也有兴趣查看 spark.streaming.kafka.maxRatePerPartition 以及 Jeroen van Wilgenburg on his blog 对这些属性的一些研究和建议。

除上述答案外。批量大小是 3 个参数的乘积

  1. batchDuration: 流式数据分批的时间间隔(以秒为单位)。
  2. spark.streaming.kafka.maxRatePerPartition:设置每秒每个分区的最大消息数。这与 batchDuration 结合使用时将控制批量大小。您希望 maxRatePerPartition 设置得很大(否则您会有效地限制您的工作)并且 batchDuration 非常小。
  3. kafka 主题中的分区数

为了更好地解释该产品在背压下如何工作 enable/disable ()

限制最大批大小将极大地帮助控制处理时间,但是,它会增加消息的处理延迟。

通过属性下面的设置,我们可以控制批量大小 spark.streaming.receiver.maxRate= spark.streaming.kafka.maxRatePerPartition=

您甚至可以通过启用背压,根据处理时间动态设置批处理大小 spark.streaming.backpressure.enabled:真 spark.streaming.backpressure.initialRate: