spring-cloud-stream kafka离线消费者消息丢失
spring-cloud-stream kafka offline consumer messages lost
我一直在玩 spring-cloud-stream(1.0.0.BUILD-SNAPSHOT with kafka binder)并注意到消费者离线时,发送的任何消息都会丢失。当我启动消费者时,它不会处理发送到 kafka 的积压请求。那是故意的吗?
是;默认是从主题末尾开始收听。
使用
spring:
cloud:
stream:
binder:
kafka:
start-offset: earliest
我们确实需要改进我们的文档,但同时这里有一些提示。
如果你想让消费者处理他们停止时产生的消息,你需要指定一个消费者组名称,例如spring.cloud.stream.bindings.<bindingName>.group=foo
。指定消费者组时,应用程序从以下任一位置开始:a)如果具有相同消费者组的客户端已经有 运行,则为最新未使用的消息(即我们记录了该消费者的消费偏移量)或 b)指定的值by spring.cloud.stream.binder.kafka.start-offset
(可以是earliest
或latest
,代表话题的开始或结束)。所以restarting consumer会保留consumer group,从离开的地方开始消费,新的consumer会按照start选项开始。如果未指定组,则消费者将被视为 'anonymous',并且只对它启动后产生的消息感兴趣,因此它将始终从分区集的末尾开始。
如果你想绕过已经保存的值,那么你可以使用spring.cloud.stream.binder.kafka.reset-offets=true
,这将导致客户端重置保存的偏移量并从spring.cloud.stream.binder.kafka.start-offset
指示的值开始。
这反映了 0.8.2 预期(并支持)的行为。一旦我们升级到 0.9,我们将相应地更新内容。
我一直在玩 spring-cloud-stream(1.0.0.BUILD-SNAPSHOT with kafka binder)并注意到消费者离线时,发送的任何消息都会丢失。当我启动消费者时,它不会处理发送到 kafka 的积压请求。那是故意的吗?
是;默认是从主题末尾开始收听。
使用
spring:
cloud:
stream:
binder:
kafka:
start-offset: earliest
我们确实需要改进我们的文档,但同时这里有一些提示。
如果你想让消费者处理他们停止时产生的消息,你需要指定一个消费者组名称,例如spring.cloud.stream.bindings.<bindingName>.group=foo
。指定消费者组时,应用程序从以下任一位置开始:a)如果具有相同消费者组的客户端已经有 运行,则为最新未使用的消息(即我们记录了该消费者的消费偏移量)或 b)指定的值by spring.cloud.stream.binder.kafka.start-offset
(可以是earliest
或latest
,代表话题的开始或结束)。所以restarting consumer会保留consumer group,从离开的地方开始消费,新的consumer会按照start选项开始。如果未指定组,则消费者将被视为 'anonymous',并且只对它启动后产生的消息感兴趣,因此它将始终从分区集的末尾开始。
如果你想绕过已经保存的值,那么你可以使用spring.cloud.stream.binder.kafka.reset-offets=true
,这将导致客户端重置保存的偏移量并从spring.cloud.stream.binder.kafka.start-offset
指示的值开始。
这反映了 0.8.2 预期(并支持)的行为。一旦我们升级到 0.9,我们将相应地更新内容。