如何避免在火花流中排队批次

How to avoid queuing up of Batches in spark streaming

我使用 Direct Streaming 进行 Spark 流式传输,我正在使用以下配置

Batch interval 60s

spark.streaming.kafka.maxRatePerPartition 42

auto.offset.reset earliest

因为我使用最早的选项开始流式批处理,为了更快地使用来自 Kafka 的消息并减少延迟,我将 spark.streaming.kafka.maxRatePerPartition 保持为 42。因此它应该使用 42 x 60s x 60 partition = 151200每批记录。

我这里有两个问题

  1. 我看到最初的几个批次正确地消耗了 151200 条记录,在后面的批次中逐渐减少,即使有很多记录要从 kafka 中消耗。请看下面的截图。可能是什么原因
  2. 我看到批次排队很多。我们怎样才能避免这种情况。

是否有可能实现以下场景 我们的batch间隔为60s,如果每批运行s在60s以内,下一批可以准时开始。如果一个批次花费的时间超过 60 秒,我们不希望下一批次来排队。一旦现有 运行 完成,下一个 运行 就可以开始选择记录直到那个时间。这样我们就不会有滞后,也不会排队。

Spark UI - Screenshot for question 1

您观察到的是 Spark 背压机制的预期行为。

您已将配置 spark.streaming.kafka.maxRatePerPartition 设置为 42,并且根据您的计算,作业将开始抓取

42 * 60 partitions * 60 seconds batch interval = 151200 records per batch

查看所附屏幕截图中的时间(处理时间),作业从该数量的记录开始。

但是,由于处理所有这 151200 条记录需要超过 60 秒的时间,背压机制将减少后续批次中的输入记录。这只会在几个批次之后发生,因为背压机制(又名“PID 控制器”)需要等到第一个批次完成,以便它可以使用该经验来估计下一个间隔的输入记录数。如前所述,处理第一个 151200 花费的时间超过一个间隔,这意味着后续两个间隔已经使用 maxRatePerPartition 进行了安排,而没有完成批处理间隔的经验。

这就是为什么您看到输入记录仅在第四批中减少的原因。然后,输入记录的数量仍然太多,无法在 60 秒内处理,因此作业产生越来越多的延迟,PID 控制器(背压)最终意识到它落后于许多记录,并且正在急剧减少输入记录数为spark.streaming.backpressure.pid.minRate设置的最小值。在您的情况下,此值似乎设置为 2,这导致每个批次间隔有 2 * 60 * 60 = 7200 条记录。

总而言之,您观察到的是预期和预期的行为。 Streaming 作业需要一些批次来理解和学习它应该从 Kafka 获取多少数据以适应给定的(非灵活的)60 秒批次间隔。 无论一个批次内的处理时间有多长,您的流式处理作业已经在每 60 秒整提前计划下一个批次。

你可以做什么:

  • 建议将maxRatePerPartition设置为实际容量的150-200%左右。让作业 运行 稍微长一点,您就会看到估计的 100% 是多少。
  • 当您在 Kafka 中使用 60 个分区时,您需要确保数据均匀分布在各个分区中。只有这样 maxRatePerPartition 才会执行您打算执行的操作
  • 有 60 个分区,您可以在 Spark 集群中使用 60 个核心以获得最大消费速度。