@RetryableTopic 在使用 topicPartitions 重置偏移量时表现出奇怪的行为 - spring kafka
@RetryableTopic showing weird behaviour when using with topicPartitions to reset offset - spring kafka
我正在尝试使用 @RetryableTopic 来取消阻塞重试和 topicPartitions 以便从头开始读取消息。
下面是我的监听器(我只有一个分区):
@Slf4j
@Component
public class SingleTopicRetryConsumer {
@RetryableTopic(
attempts = "4",
backoff = @Backoff(delay = 1000),
fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
@KafkaListener(topicPartitions = {@TopicPartition(topic = "products",
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))})
public void listen(ConsumerRecord<String, String> message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("message consumed - \nkey: {} , \nvalue: {}, \ntopic: {}, \nat: {}",
message.key(),
message.value(),
message.topic(),
LocalDateTime.now());
}
@DltHandler
public void dltListener(ConsumerRecord<String, String> message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("message consumed at DLT - \nkey: {} , \nvalue: {}, \ntopic: {}, \nat: {}",
message.key(),
message.value(),
message.topic(),
LocalDateTime.now());
}
}
配置属性:
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group_id
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
上面的代码开始发出奇怪的行为,它两次从主侦听器读取相同的消息,一次从 DLT 读取相同的消息,但仅从主要主题读取。
日志:
15:10:50.950 [org.springframework.kafka.KafkaListenerEndpointContainer#8-0-C-1] INFO c.m.s.c.n.SingleTopicRetryConsumer
- message consumed -
key: product1 ,
value: This is Product1,
topic: products,
at: 2022-04-07T15:10:50.950810
15:10:50.950 [org.springframework.kafka.KafkaListenerEndpointContainer#9-retry-0-C-1] INFO c.m.s.c.n.SingleTopicRetryConsumer -
message consumed -
key: product1 ,
value: This is Product1,
topic: products,
at: 2022-04-07T15:10:50.950810
15:10:50.950 [org.springframework.kafka.KafkaListenerEndpointContainer#10-dlt-0-C-1] INFO c.m.s.c.n.SingleTopicRetryConsumer -
message consumed at DLT -
key: product1 ,
value: This is Product1,
topic: products,
at: 2022-04-07T15:10:50.950810
如果我在没有 topicPartitions
的情况下通过删除下面的行来使用上面的代码,侦听器将按预期工作。
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))}
关于为什么会发生这种情况的任何线索?
更新:此错误已在 Spring 中修复,适用于 Apache Kafka 2.8.5
。
这是一个错误。问题是我们将重试主题名称设置为端点的 topics
属性,而不是将其设置为 topicPartition
。因此,我们最终为主要端点提供了两个侦听器,为重试主题提供了 none。
如果可以,请打开一个问题:https://github.com/spring-projects/spring-kafka/issues
不确定是否有使用主题分区解决此问题的解决方法 - 它应该在几周内在 2.8.5
版本中修复。
感谢报告。
我正在尝试使用 @RetryableTopic 来取消阻塞重试和 topicPartitions 以便从头开始读取消息。
下面是我的监听器(我只有一个分区):
@Slf4j
@Component
public class SingleTopicRetryConsumer {
@RetryableTopic(
attempts = "4",
backoff = @Backoff(delay = 1000),
fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
@KafkaListener(topicPartitions = {@TopicPartition(topic = "products",
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))})
public void listen(ConsumerRecord<String, String> message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("message consumed - \nkey: {} , \nvalue: {}, \ntopic: {}, \nat: {}",
message.key(),
message.value(),
message.topic(),
LocalDateTime.now());
}
@DltHandler
public void dltListener(ConsumerRecord<String, String> message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("message consumed at DLT - \nkey: {} , \nvalue: {}, \ntopic: {}, \nat: {}",
message.key(),
message.value(),
message.topic(),
LocalDateTime.now());
}
}
配置属性:
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group_id
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
上面的代码开始发出奇怪的行为,它两次从主侦听器读取相同的消息,一次从 DLT 读取相同的消息,但仅从主要主题读取。
日志:
15:10:50.950 [org.springframework.kafka.KafkaListenerEndpointContainer#8-0-C-1] INFO c.m.s.c.n.SingleTopicRetryConsumer
- message consumed -
key: product1 ,
value: This is Product1,
topic: products,
at: 2022-04-07T15:10:50.950810
15:10:50.950 [org.springframework.kafka.KafkaListenerEndpointContainer#9-retry-0-C-1] INFO c.m.s.c.n.SingleTopicRetryConsumer -
message consumed -
key: product1 ,
value: This is Product1,
topic: products,
at: 2022-04-07T15:10:50.950810
15:10:50.950 [org.springframework.kafka.KafkaListenerEndpointContainer#10-dlt-0-C-1] INFO c.m.s.c.n.SingleTopicRetryConsumer -
message consumed at DLT -
key: product1 ,
value: This is Product1,
topic: products,
at: 2022-04-07T15:10:50.950810
如果我在没有 topicPartitions
的情况下通过删除下面的行来使用上面的代码,侦听器将按预期工作。
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))}
关于为什么会发生这种情况的任何线索?
更新:此错误已在 Spring 中修复,适用于 Apache Kafka 2.8.5
。
这是一个错误。问题是我们将重试主题名称设置为端点的 topics
属性,而不是将其设置为 topicPartition
。因此,我们最终为主要端点提供了两个侦听器,为重试主题提供了 none。
如果可以,请打开一个问题:https://github.com/spring-projects/spring-kafka/issues
不确定是否有使用主题分区解决此问题的解决方法 - 它应该在几周内在 2.8.5
版本中修复。
感谢报告。