Kafka 消息发送到单个主题而不是 n 个重试主题和 dlt
Kafka message are sent to single topic instead of n retry topic and dlt
我正在尝试使用 DLT 实现 n-retry 主题,但所有消息都被推送到单个主题 test-topic-retry-0,test-topic-retry-0 中有 3 条重复记录,它们是应该是这样的:
- test-topic-retry-0 -> 失败后 1 条消息
- test-topic-retry-1 -> 第一次重试后有 1 条消息
- test-topic-dlt -> 所有重试失败后有 1 条消息
Kafka 似乎正在将所有消息推送到同一个主题。
卡夫卡配置:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.setBatchListener(false);
factory.setConcurrency(1);
factory.getContainerProperties().setAckMode(AckMode.RECORD);
return factory;
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(BOOTSTRAP_SERVERS_CONFIG, kafkaBootStrapAddress);
props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(MAX_POLL_RECORDS_CONFIG, 100);
props.put(HEARTBEAT_INTERVAL_MS_CONFIG, 2000);
props.put(SESSION_TIMEOUT_MS_CONFIG, 10000);
return props;
}
@Bean
public KafkaAdmin kadmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootStrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootStrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> prodTemplate() {
return new KafkaTemplate<>(producerFactoryString());
}
@Bean
public RetryTopicConfiguration retryTopicConfig(KafkaTemplate<String, Object> template,
ConcurrentKafkaListenerContainerFactory<String, Object> factory) {
template.getProducerFactory().getConfigurationProperties());
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(2000, 5, Long.MAX_VALUE)
.maxAttempts(3)
.timeoutAfter(-1)
.autoCreateTopicsWith(3, (short) 3)
.dltProcessingFailureStrategy(DltStrategy.FAIL_ON_ERROR)
.setTopicSuffixingStrategy(TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
.retryTopicSuffix("-retry")
.dltSuffix("-dlt")
.listenerFactory(factory)
.create(template);
}
听众:
@KafkaListener(topics=“test-topic”)
public void onMessage(ConsumerRecord<String, String> r) {
throw new RuntimeException(“test”);
}
@DltHandler
public void handleDlt(ConsumerRecord<String, String> r) {
log.error(“test dlt”);
}
我刚刚几乎完全准确地复制了您的代码,它对我来说工作正常。
但是,我不得不添加 .dltHandlerMethod
:
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(2000, 5, Long.MAX_VALUE)
.maxAttempts(3)
.timeoutAfter(-1)
.autoCreateTopicsWith(3, (short) 1)
.dltProcessingFailureStrategy(DltStrategy.FAIL_ON_ERROR)
.setTopicSuffixingStrategy(TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
.retryTopicSuffix("-retry")
.dltSuffix("-dlt")
.listenerFactory(factory)
.dltHandlerMethod(Listener.class, "handleDlt")
.create(template);
@Component
class Listener {
private static final Logger log = LoggerFactory.getLogger(Listener.class);
@KafkaListener(id = "so69453282", topics = "test-topic")
public void onMessage(ConsumerRecord<String, String> r) {
log.info(r.topic());
throw new RuntimeException("test");
}
@DltHandler
public void handleDlt(ConsumerRecord<String, String> r) {
log.error("test dlt");
}
}
2021-10-05 13:03:12,451 [so69453282-0-C-1] test-topic
2021-10-05 13:03:14,485 [so69453282-retry-0-0-C-1] test-topic-retry-0
2021-10-05 13:03:24,523 [so69453282-retry-1-0-C-1] test-topic-retry-1
2021-10-05 13:03:25,031 [so69453282-dlt-0-C-1] test dlt
我正在尝试使用 DLT 实现 n-retry 主题,但所有消息都被推送到单个主题 test-topic-retry-0,test-topic-retry-0 中有 3 条重复记录,它们是应该是这样的:
- test-topic-retry-0 -> 失败后 1 条消息
- test-topic-retry-1 -> 第一次重试后有 1 条消息
- test-topic-dlt -> 所有重试失败后有 1 条消息
Kafka 似乎正在将所有消息推送到同一个主题。
卡夫卡配置:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.setBatchListener(false);
factory.setConcurrency(1);
factory.getContainerProperties().setAckMode(AckMode.RECORD);
return factory;
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(BOOTSTRAP_SERVERS_CONFIG, kafkaBootStrapAddress);
props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(MAX_POLL_RECORDS_CONFIG, 100);
props.put(HEARTBEAT_INTERVAL_MS_CONFIG, 2000);
props.put(SESSION_TIMEOUT_MS_CONFIG, 10000);
return props;
}
@Bean
public KafkaAdmin kadmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootStrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootStrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> prodTemplate() {
return new KafkaTemplate<>(producerFactoryString());
}
@Bean
public RetryTopicConfiguration retryTopicConfig(KafkaTemplate<String, Object> template,
ConcurrentKafkaListenerContainerFactory<String, Object> factory) {
template.getProducerFactory().getConfigurationProperties());
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(2000, 5, Long.MAX_VALUE)
.maxAttempts(3)
.timeoutAfter(-1)
.autoCreateTopicsWith(3, (short) 3)
.dltProcessingFailureStrategy(DltStrategy.FAIL_ON_ERROR)
.setTopicSuffixingStrategy(TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
.retryTopicSuffix("-retry")
.dltSuffix("-dlt")
.listenerFactory(factory)
.create(template);
}
听众:
@KafkaListener(topics=“test-topic”)
public void onMessage(ConsumerRecord<String, String> r) {
throw new RuntimeException(“test”);
}
@DltHandler
public void handleDlt(ConsumerRecord<String, String> r) {
log.error(“test dlt”);
}
我刚刚几乎完全准确地复制了您的代码,它对我来说工作正常。
但是,我不得不添加 .dltHandlerMethod
:
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(2000, 5, Long.MAX_VALUE)
.maxAttempts(3)
.timeoutAfter(-1)
.autoCreateTopicsWith(3, (short) 1)
.dltProcessingFailureStrategy(DltStrategy.FAIL_ON_ERROR)
.setTopicSuffixingStrategy(TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
.retryTopicSuffix("-retry")
.dltSuffix("-dlt")
.listenerFactory(factory)
.dltHandlerMethod(Listener.class, "handleDlt")
.create(template);
@Component
class Listener {
private static final Logger log = LoggerFactory.getLogger(Listener.class);
@KafkaListener(id = "so69453282", topics = "test-topic")
public void onMessage(ConsumerRecord<String, String> r) {
log.info(r.topic());
throw new RuntimeException("test");
}
@DltHandler
public void handleDlt(ConsumerRecord<String, String> r) {
log.error("test dlt");
}
}
2021-10-05 13:03:12,451 [so69453282-0-C-1] test-topic
2021-10-05 13:03:14,485 [so69453282-retry-0-0-C-1] test-topic-retry-0
2021-10-05 13:03:24,523 [so69453282-retry-1-0-C-1] test-topic-retry-1
2021-10-05 13:03:25,031 [so69453282-dlt-0-C-1] test dlt