我们如何将 DeadLetterPublishingRecoverer 与 RetryTemplate 一起使用?
How can we use DeadLetterPublishingRecoverer with RetryTemplate?
我想将 RetryTemplate 与 DeadLetterPublishingRecoverer 一起使用。
我如何使用它才能从 RetryTemplate 读取重试计数和 retryInterval,并在重试后移动到 dlq。
@Bean
public RetryTemplate retryTemplate(){
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(retryMaxAttempts);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(retryInterval);
retryTemplate.setRetryPolicy(retryPolicy());
retryTemplate.setBackOffPolicy(backOffPolicy());
return retryTemplate;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(ChainedKafkaTransactionManager<String, String> chainedTM) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(pollTimeout);
factory.getContainerProperties().setSyncCommits(true);
factory.setRetryTemplate(retryTemplate());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setTransactionManager(chainedTM);
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 1));
return factory;
}
您应该在重试逻辑中而不是在错误处理程序中进行恢复(发布)。参见 。
factory.setRecoveryCallback(context -> {
recoverer.accept((ConsumerRecord<?, ?>) context.getAttribute("record"),
(Exception) context.getLastThrowable());
return null;
});
其中 recoverer
是 DeadLetterPublishingRecoverer
。
编辑
/**
* Create an instance with the provided template and destination resolving function,
* that receives the failed consumer record and the exception and returns a
* {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
* 0, no partition is set when publishing to the topic.
* @param template the {@link KafkaTemplate} to use for publishing.
* @param destinationResolver the resolving function.
*/
public DeadLetterPublishingRecoverer(KafkaTemplate<? extends Object, ? extends Object> template,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
this(Collections.singletonMap(Object.class, template), destinationResolver);
}
如果 DLT 没有与原始主题一样多的分区,您需要一个自定义目标解析器:
(record, exception) -> new TopicPartition("my.DLT", -1)
负分区,Kafka会选择分区;默认解析器使用相同的分区。
DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + ".DLT", cr.partition());
这在[文档[(https://docs.spring.io/spring-kafka/docs/2.2.7.RELEASE/reference/html/#dead-letters)
中有解释
You can also, optionally, configure it with a BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
, which is called to resolve the destination topic and partition. By default, the dead-letter record is sent to a topic named <originalTopic>.DLT
(the original topic name suffixed with .DLT) and to the same partition as the original record. Therefore, when you use the default resolver, the dead-letter topic must have at least as many partitions as the original topic. If the returned TopicPartition
has a negative partition, the partition is not set in the ProducerRecord
, so the partition is selected by Kafka.
我想将 RetryTemplate 与 DeadLetterPublishingRecoverer 一起使用。
我如何使用它才能从 RetryTemplate 读取重试计数和 retryInterval,并在重试后移动到 dlq。
@Bean
public RetryTemplate retryTemplate(){
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(retryMaxAttempts);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(retryInterval);
retryTemplate.setRetryPolicy(retryPolicy());
retryTemplate.setBackOffPolicy(backOffPolicy());
return retryTemplate;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(ChainedKafkaTransactionManager<String, String> chainedTM) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(pollTimeout);
factory.getContainerProperties().setSyncCommits(true);
factory.setRetryTemplate(retryTemplate());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setTransactionManager(chainedTM);
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 1));
return factory;
}
您应该在重试逻辑中而不是在错误处理程序中进行恢复(发布)。参见
factory.setRecoveryCallback(context -> {
recoverer.accept((ConsumerRecord<?, ?>) context.getAttribute("record"),
(Exception) context.getLastThrowable());
return null;
});
其中 recoverer
是 DeadLetterPublishingRecoverer
。
编辑
/**
* Create an instance with the provided template and destination resolving function,
* that receives the failed consumer record and the exception and returns a
* {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
* 0, no partition is set when publishing to the topic.
* @param template the {@link KafkaTemplate} to use for publishing.
* @param destinationResolver the resolving function.
*/
public DeadLetterPublishingRecoverer(KafkaTemplate<? extends Object, ? extends Object> template,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
this(Collections.singletonMap(Object.class, template), destinationResolver);
}
如果 DLT 没有与原始主题一样多的分区,您需要一个自定义目标解析器:
(record, exception) -> new TopicPartition("my.DLT", -1)
负分区,Kafka会选择分区;默认解析器使用相同的分区。
DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + ".DLT", cr.partition());
这在[文档[(https://docs.spring.io/spring-kafka/docs/2.2.7.RELEASE/reference/html/#dead-letters)
中有解释You can also, optionally, configure it with a
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
, which is called to resolve the destination topic and partition. By default, the dead-letter record is sent to a topic named<originalTopic>.DLT
(the original topic name suffixed with .DLT) and to the same partition as the original record. Therefore, when you use the default resolver, the dead-letter topic must have at least as many partitions as the original topic. If the returnedTopicPartition
has a negative partition, the partition is not set in theProducerRecord
, so the partition is selected by Kafka.