在 spring kafka 中使用 SeekToCurrentErrorHandler 时如何设置重试间隔时间
How to set Retry Interval time when using SeekToCurrentErrorHandler in spring kafka
我正在 spring-boot 应用程序中使用 SeekToCurrentErrorHandler 处理容器侦听器错误。如果发生异常,我想设置重试间隔,它应该等待一段时间并重试,直到最大尝试次数。
我尝试通过设置备份策略来添加 RetryTemplate。但它没有正常工作。当错误发生最大尝试时间时,它将调用 SeekToCurrentErrorHandler 2 次。
@Bean
public RetryPolicy retryPolicy() {
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(retryMaxAttempts);
return simpleRetryPolicy;
}
@Bean
public BackOffPolicy backOffPolicy() {
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(retryInterval);
return backOffPolicy;
}
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy());
retryTemplate.setBackOffPolicy(backOffPolicy());
return retryTemplate;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(
ChainedKafkaTransactionManager<String, String> chainedTM) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(pollTimeout);
factory.getContainerProperties().setSyncCommits(true);
// ------->> factory.setRetryTemplate(retryTemplate());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setTransactionManager(chainedTM);
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
// handle errors
}, retryMaxAttempts);
factory.setErrorHandler(errorHandler);
log.debug("Kafka Receiver Config kafkaListenerContainerFactory created");
return factory;
}
使用SeekToCurrentErrorHandler时如何设置retry Interval时间?
这是一个new feature in the next release (2.3)。
但是,您可以使用重试模板来执行此操作,但总尝试次数将是这两个属性的倍数。
我正在 spring-boot 应用程序中使用 SeekToCurrentErrorHandler 处理容器侦听器错误。如果发生异常,我想设置重试间隔,它应该等待一段时间并重试,直到最大尝试次数。
我尝试通过设置备份策略来添加 RetryTemplate。但它没有正常工作。当错误发生最大尝试时间时,它将调用 SeekToCurrentErrorHandler 2 次。
@Bean
public RetryPolicy retryPolicy() {
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(retryMaxAttempts);
return simpleRetryPolicy;
}
@Bean
public BackOffPolicy backOffPolicy() {
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(retryInterval);
return backOffPolicy;
}
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy());
retryTemplate.setBackOffPolicy(backOffPolicy());
return retryTemplate;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(
ChainedKafkaTransactionManager<String, String> chainedTM) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(pollTimeout);
factory.getContainerProperties().setSyncCommits(true);
// ------->> factory.setRetryTemplate(retryTemplate());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setTransactionManager(chainedTM);
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
// handle errors
}, retryMaxAttempts);
factory.setErrorHandler(errorHandler);
log.debug("Kafka Receiver Config kafkaListenerContainerFactory created");
return factory;
}
使用SeekToCurrentErrorHandler时如何设置retry Interval时间?
这是一个new feature in the next release (2.3)。
但是,您可以使用重试模板来执行此操作,但总尝试次数将是这两个属性的倍数。