使用 ReactiveKafkaConsumerTemplate 时的轮询行为
Polling behavior when using ReactiveKafkaConsumerTemplate
我有一个 Spring 引导应用程序使用 ReactiveKafkaConsumerTemplate
来消费来自 Kafka 的消息。
我使用 kafkaConsumerTemplate.receive()
消费消息,因此我手动确认每条消息。由于我以异步方式工作,因此不会按顺序处理消息。
我想知道在这种情况下提交和轮询过程是如何工作的 - 如果我轮询了 100 条消息但只确认了其中的 99 条(未确认的消息位于我轮询的 100 条消息的中间,比如第 50 条) ),下一次轮询操作会发生什么?只有在确认了所有 100 条消息(并提交了偏移量)之后,它才会真正进行轮询,直到那时我会一直不断地向我的应用程序获取未确认的消息,直到我确认为止?
Kafka 为消费者维护 2 个偏移量 group/partition - 当前 position()
和已提交的偏移量。当消费者启动时,位置设置为最后提交的偏移量。
每次轮询后都会更新位置,因此下一次轮询永远不会 return 相同的记录,无论它是否已提交(除非执行了查找)。
但是,对于 reactor,您必须确保以正确的顺序执行提交,因为记录不会被单独确认,只会保留提交的偏移量。
如果您乱序提交并重新启动您的应用程序,您可能会重新传送一些已处理的消息。
我们最近在框架中添加了对 out-of-order 提交的支持。
https://projectreactor.io/docs/kafka/release/reference/#_out_of_order_commits
当前版本为1.3.11,包含此功能。
我有一个 Spring 引导应用程序使用 ReactiveKafkaConsumerTemplate
来消费来自 Kafka 的消息。
我使用 kafkaConsumerTemplate.receive()
消费消息,因此我手动确认每条消息。由于我以异步方式工作,因此不会按顺序处理消息。
我想知道在这种情况下提交和轮询过程是如何工作的 - 如果我轮询了 100 条消息但只确认了其中的 99 条(未确认的消息位于我轮询的 100 条消息的中间,比如第 50 条) ),下一次轮询操作会发生什么?只有在确认了所有 100 条消息(并提交了偏移量)之后,它才会真正进行轮询,直到那时我会一直不断地向我的应用程序获取未确认的消息,直到我确认为止?
Kafka 为消费者维护 2 个偏移量 group/partition - 当前 position()
和已提交的偏移量。当消费者启动时,位置设置为最后提交的偏移量。
每次轮询后都会更新位置,因此下一次轮询永远不会 return 相同的记录,无论它是否已提交(除非执行了查找)。
但是,对于 reactor,您必须确保以正确的顺序执行提交,因为记录不会被单独确认,只会保留提交的偏移量。
如果您乱序提交并重新启动您的应用程序,您可能会重新传送一些已处理的消息。
我们最近在框架中添加了对 out-of-order 提交的支持。
https://projectreactor.io/docs/kafka/release/reference/#_out_of_order_commits
当前版本为1.3.11,包含此功能。