通过在每次应用程序重新启动时创建一个新的消费者组来重置主题偏移量
Reset topic offset by creating a new consumer group at every application restart
我有一个 Kafka 主题和一个消费者,在 Spring 云应用程序中分配了一个消费者组(必须)。作为一项要求,在每次重新启动应用程序时,我都需要从头开始阅读所有收到的消息。这本应由 resetOffsets
属性 实现,但从 this issue 可以清楚地看出它目前不起作用。
我发现 this workaround 在 kafka consumer api 中使用,建议在每次重启时为消费者组分配一个新的随机名称,作为从最早开始读取的一种方式。是 Spring Cloud Stream 中的 possible/recommended 吗?如何为消费者组定义动态名称?
是的,它也适用于 SCSt,但是正如您所说,设置随机组 ID 有点棘手,尽管您可以在启动 SpringApplication
之前将其设置为 System.property
.
如果你直接使用spring-kafka,这很简单,只需实现ConsumerSeekAware
,当分配分区时你可以seekToBeginning
。
但是,使用 SCSt,您无法直接访问侦听器。
一种解决方法是在启动 SpringApplication
之前通过创建具有相同组 ID 的消费者手动执行搜索。这有点棘手,但如果您有多个应用程序实例,因为您每次都可能获得不同的分区。
我们会再次考虑解决该问题(我刚刚对此发表了评论)。
如果您需要应用程序每次都从头开始重新启动,您有几个选择:
您可以在使用 kafka-consumer-groups.sh
工具 (kafka.admin.ConsumerGroupCommand.scala
)
[=38= 重新启动应用程序之前将提交的偏移量重置为 earliest
]
重新启动后,应用程序可以查找到开头并手动提交偏移量 0。如果将 auto.offset.reset
设置为 earliest
,即使 0 不是有效偏移量,它将从头开始。
您每次都可以使用不同的消费者 group.id
值。在您的 Consumer Configuration
bean 中,在 Properties
对象中插入如下内容:
properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
最后,你到底有没有使用委员会抵消?如果没有,只需禁用 enable.auto.commit
然后应用程序将始终遵循 auto.offset.reset
设置。
选项 1 和 2 通常是首选,因为它们保持一致 group.id
允许轻松地将消费者实例添加到组并监视组。
in spring-kafka, 如果您通过配置文件配置消费者,例如 application.yaml (而不是以编程方式)你可以用 SpEL (spring 表达语言)来提供一个 UUID-based consumer group执行+最早偏移量
# consume-all-configuration
auto-offset-reset: earliest
group: consumer-local-#{ T(java.util.UUID).randomUUID().toString() }
我有一个 Kafka 主题和一个消费者,在 Spring 云应用程序中分配了一个消费者组(必须)。作为一项要求,在每次重新启动应用程序时,我都需要从头开始阅读所有收到的消息。这本应由 resetOffsets
属性 实现,但从 this issue 可以清楚地看出它目前不起作用。
我发现 this workaround 在 kafka consumer api 中使用,建议在每次重启时为消费者组分配一个新的随机名称,作为从最早开始读取的一种方式。是 Spring Cloud Stream 中的 possible/recommended 吗?如何为消费者组定义动态名称?
是的,它也适用于 SCSt,但是正如您所说,设置随机组 ID 有点棘手,尽管您可以在启动 SpringApplication
之前将其设置为 System.property
.
如果你直接使用spring-kafka,这很简单,只需实现ConsumerSeekAware
,当分配分区时你可以seekToBeginning
。
但是,使用 SCSt,您无法直接访问侦听器。
一种解决方法是在启动 SpringApplication
之前通过创建具有相同组 ID 的消费者手动执行搜索。这有点棘手,但如果您有多个应用程序实例,因为您每次都可能获得不同的分区。
我们会再次考虑解决该问题(我刚刚对此发表了评论)。
如果您需要应用程序每次都从头开始重新启动,您有几个选择:
您可以在使用
[=38= 重新启动应用程序之前将提交的偏移量重置为kafka-consumer-groups.sh
工具 (kafka.admin.ConsumerGroupCommand.scala
)earliest
]重新启动后,应用程序可以查找到开头并手动提交偏移量 0。如果将
auto.offset.reset
设置为earliest
,即使 0 不是有效偏移量,它将从头开始。您每次都可以使用不同的消费者
group.id
值。在您的 ConsumerConfiguration
bean 中,在Properties
对象中插入如下内容:properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
最后,你到底有没有使用委员会抵消?如果没有,只需禁用 enable.auto.commit
然后应用程序将始终遵循 auto.offset.reset
设置。
选项 1 和 2 通常是首选,因为它们保持一致 group.id
允许轻松地将消费者实例添加到组并监视组。
in spring-kafka, 如果您通过配置文件配置消费者,例如 application.yaml (而不是以编程方式)你可以用 SpEL (spring 表达语言)来提供一个 UUID-based consumer group执行+最早偏移量
# consume-all-configuration
auto-offset-reset: earliest
group: consumer-local-#{ T(java.util.UUID).randomUUID().toString() }