可以使用 Akka Kafka 流限制消耗传入消息的速率
Can Throttle rate of consuming incoming messages using Akka Kafka stream
有没有办法通过一些配置使用 Akka kafka Stream Consumer https://doc.akka.io/docs/alpakka-kafka/0.15/consumer.html 在特定时间段假设 1 分钟只读取 X 条消息。
需要处理在特定时间来自生产者的消息轰炸的情况,因此消费者可能会受到影响。
Akka Streams 中的 throttle
阶段可用于限制元素传递到流中下一阶段的速率(使用 Scala API):
.throttle(100, 1.minute)
这仍然会处理弹幕中的每条消息,但如果突发事件之间有足够的时间处理可以赶上。
请注意,Akka Streams 具有强大的背压机制:下游阶段将只需要它们可以处理的足够多的消息。这实际上使处理过程自我节流:如果这还不够,或者您已经“选择退出”背压,则节流阶段很有用。
如果突发事件之间没有足够的时间来处理,可以在 throttle
之前放置一个 buffer
阶段,并使用 drop-message-if-full 策略。这将防止节流阀的背压传播到 Kafka 消费者,例如:
.buffer(1000, OverflowStrategy.dropHead) // Drop the oldest message
.throttle(100, 1.minute)
有没有办法通过一些配置使用 Akka kafka Stream Consumer https://doc.akka.io/docs/alpakka-kafka/0.15/consumer.html 在特定时间段假设 1 分钟只读取 X 条消息。 需要处理在特定时间来自生产者的消息轰炸的情况,因此消费者可能会受到影响。
Akka Streams 中的 throttle
阶段可用于限制元素传递到流中下一阶段的速率(使用 Scala API):
.throttle(100, 1.minute)
这仍然会处理弹幕中的每条消息,但如果突发事件之间有足够的时间处理可以赶上。
请注意,Akka Streams 具有强大的背压机制:下游阶段将只需要它们可以处理的足够多的消息。这实际上使处理过程自我节流:如果这还不够,或者您已经“选择退出”背压,则节流阶段很有用。
如果突发事件之间没有足够的时间来处理,可以在 throttle
之前放置一个 buffer
阶段,并使用 drop-message-if-full 策略。这将防止节流阀的背压传播到 Kafka 消费者,例如:
.buffer(1000, OverflowStrategy.dropHead) // Drop the oldest message
.throttle(100, 1.minute)