有没有办法在 DeadLetterPublishingRecoverer 中找到消费者组?
Is there a way to find consumer group in DeadLetterPublishingRecoverer?
我有一个使用 DLQ 方法的项目,在 @KafkaListener
中出现任何异常时,错误将发送到具有结构 error-<topic>-<consumergroup>
的主题。当我们想要手动重试来自该 DLQ 的 kafka 消息时,我们会将其生成到具有相似结构 retry-<topic>-<consumergroup>
的主题。因此,例如,如果我们有一个主要主题 foo
,由消费者组 bar
消费,我们将有主题:foo
、error-foo-bar
和 retry-foo-bar
.
这样,我们可以为特定的消费者组重试消息。消息处理程序从主要主题和重试主题中读取。
由于我们的项目中有多个监听器都使用同一个容器,我们将主题(主要、错误和重试)放在应用程序属性中并配置 DeadLetterPublishingRecoverer
以找到相应的 DLQ (注意,这是 Kotlin):
val recoverer = DeadLetterPublishingRecoverer(template) { record, _ ->
val dlq = kafkaProperties.topics.values.firstOrNull { it.main == record.topic() || it.retry == record.topic() }!!.dlq
return@DeadLetterPublishingRecoverer TopicPartition(dlq, -1)
}
一切正常。但是现在我想添加另一个 @KafkaListener
来收听与另一个收听者相同的主题 。因此我需要给这个监听器一个单独的消费者组。
但是,如果我使用与上述相同的逻辑来查找相应的 DLQ 主题,则两个侦听器之一将使用另一个的 DLQ,因为它不在此处查看消费者组。
所以我的问题是:在 Spring Kafka 中,有什么方法可以找到错误发生在哪个 消费者组 (又名 groupId
)?我查看了文档和源代码,但自己找不到。感谢您的帮助。
消费者的group.id
可通过调用KafkaUtils.getGroupId()
获得(当容器线程启动时它存储在ThreadLocal
中)。
我有一个使用 DLQ 方法的项目,在 @KafkaListener
中出现任何异常时,错误将发送到具有结构 error-<topic>-<consumergroup>
的主题。当我们想要手动重试来自该 DLQ 的 kafka 消息时,我们会将其生成到具有相似结构 retry-<topic>-<consumergroup>
的主题。因此,例如,如果我们有一个主要主题 foo
,由消费者组 bar
消费,我们将有主题:foo
、error-foo-bar
和 retry-foo-bar
.
这样,我们可以为特定的消费者组重试消息。消息处理程序从主要主题和重试主题中读取。
由于我们的项目中有多个监听器都使用同一个容器,我们将主题(主要、错误和重试)放在应用程序属性中并配置 DeadLetterPublishingRecoverer
以找到相应的 DLQ (注意,这是 Kotlin):
val recoverer = DeadLetterPublishingRecoverer(template) { record, _ ->
val dlq = kafkaProperties.topics.values.firstOrNull { it.main == record.topic() || it.retry == record.topic() }!!.dlq
return@DeadLetterPublishingRecoverer TopicPartition(dlq, -1)
}
一切正常。但是现在我想添加另一个 @KafkaListener
来收听与另一个收听者相同的主题 。因此我需要给这个监听器一个单独的消费者组。
但是,如果我使用与上述相同的逻辑来查找相应的 DLQ 主题,则两个侦听器之一将使用另一个的 DLQ,因为它不在此处查看消费者组。
所以我的问题是:在 Spring Kafka 中,有什么方法可以找到错误发生在哪个 消费者组 (又名 groupId
)?我查看了文档和源代码,但自己找不到。感谢您的帮助。
消费者的group.id
可通过调用KafkaUtils.getGroupId()
获得(当容器线程启动时它存储在ThreadLocal
中)。