可以使用 Akka Kafka 流限制消耗传入消息的速率

Can Throttle rate of consuming incoming messages using Akka Kafka stream

有没有办法通过一些配置使用 Akka kafka Stream Consumer https://doc.akka.io/docs/alpakka-kafka/0.15/consumer.html 在特定时间段假设 1 分钟只读取 X 条消息。 需要处理在特定时间来自生产者的消息轰炸的情况,因此消费者可能会受到影响。

Akka Streams 中的 throttle 阶段可用于限制元素传递到流中下一阶段的速率(使用 Scala API):

.throttle(100, 1.minute)

这仍然会处理弹幕中的每条消息,但如果突发事件之间有足够的时间处理可以赶上。

请注意,Akka Streams 具有强大的背压机制:下游阶段将只需要它们可以处理的足够多的消息。这实际上使处理过程自我节流:如果这还不够,或者您已经“选择退出”背压,则节流阶段很有用。

如果突发事件之间没有足够的时间来处理,可以在 throttle 之前放置一个 buffer 阶段,并使用 drop-message-if-full 策略。这将防止节流阀的背压传播到 Kafka 消费者,例如:

.buffer(1000, OverflowStrategy.dropHead)  // Drop the oldest message
.throttle(100, 1.minute)