使用 RetryListener 获取 org.springframework.retry.TerminatedRetryException
Getting org.springframework.retry.TerminatedRetryException with RetryListener
我正在使用 spring-kafka 2.2。8.RELEASE 并使用 @KafkaListener 注释创建消费者,这是我的消费者配置代码。
@Bean
public <K,V> ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(primaryConsumerFactory());
factory.setRetryTemplate(retryTemplate());
return factory;
}
@Bean
public DefaultKafkaConsumerFactory<Object, Object> primaryConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(MyConsumerConfig.getConfigs());
}
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setListeners(new RetryListener[]{myKafkaRetryListener});
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(Integer.parseInt(3));
retryTemplate.setRetryPolicy(retryPolicy);
ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
exponentialBackOffPolicy.setInitialInterval(500);
//As per the spring-kafka documentation, maxInterval (60000 ms) should be set less than max.poll.interval.ms (600000 ms)
exponentialBackOffPolicy.setMaxInterval(60000);
retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);
return retryTemplate;
}
这是我的自定义重试侦听器代码:
@Component
public class MyRetryListener implements RetryListener {
@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
System.out.println("##### IN open method");
return false;
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
System.out.println("##### IN close method");
}
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
System.out.println("##### Got an error and will retry");
}
}
现在,当我向测试主题发送消息时,我在消费者中抛出 TimeoutException 以便触发重试,这是我的消费者代码。
@KafkaListener(topics = "CONSUMER_RETRY_TEST_TOPIC")
public void listen(ConsumerRecord message) throws RetriableException {
System.out.println("CONSUMER_RETRY testing - Received message with key "+message.key()+" on topic " + CONSUMER_RETRY_TEST_TOPIC + " \n \n ");
throw new TimeoutException();
}
使用上面的代码配置,重试不会被触发,我的自定义重试侦听器的 'onError' 方法也不会被调用,我收到以下错误。请建议我在这里缺少什么?
org.springframework.retry.TerminatedRetryException: Retry terminated abnormally by interceptor before first attempt
请参阅 RetryListener.open() 的 JavaDocs。
<T,E extends Throwable> boolean open(RetryContext context,
RetryCallback<T,E> callback)
Called before the first attempt in a retry. For instance, implementers can set up state that is needed by the policies in the RetryOperations. The whole retry can be vetoed by returning false from this method, in which case a TerminatedRetryException will be thrown.
Type Parameters:
T - the type of object returned by the callback
E - the type of exception it declares may be thrown
Parameters:
context - the current RetryContext.
callback - the current RetryCallback.
Returns:
true if the retry should proceed.
您需要 return 正确而不是错误。
我正在使用 spring-kafka 2.2。8.RELEASE 并使用 @KafkaListener 注释创建消费者,这是我的消费者配置代码。
@Bean
public <K,V> ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(primaryConsumerFactory());
factory.setRetryTemplate(retryTemplate());
return factory;
}
@Bean
public DefaultKafkaConsumerFactory<Object, Object> primaryConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(MyConsumerConfig.getConfigs());
}
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setListeners(new RetryListener[]{myKafkaRetryListener});
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(Integer.parseInt(3));
retryTemplate.setRetryPolicy(retryPolicy);
ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
exponentialBackOffPolicy.setInitialInterval(500);
//As per the spring-kafka documentation, maxInterval (60000 ms) should be set less than max.poll.interval.ms (600000 ms)
exponentialBackOffPolicy.setMaxInterval(60000);
retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);
return retryTemplate;
}
这是我的自定义重试侦听器代码:
@Component
public class MyRetryListener implements RetryListener {
@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
System.out.println("##### IN open method");
return false;
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
System.out.println("##### IN close method");
}
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
System.out.println("##### Got an error and will retry");
}
}
现在,当我向测试主题发送消息时,我在消费者中抛出 TimeoutException 以便触发重试,这是我的消费者代码。
@KafkaListener(topics = "CONSUMER_RETRY_TEST_TOPIC")
public void listen(ConsumerRecord message) throws RetriableException {
System.out.println("CONSUMER_RETRY testing - Received message with key "+message.key()+" on topic " + CONSUMER_RETRY_TEST_TOPIC + " \n \n ");
throw new TimeoutException();
}
使用上面的代码配置,重试不会被触发,我的自定义重试侦听器的 'onError' 方法也不会被调用,我收到以下错误。请建议我在这里缺少什么?
org.springframework.retry.TerminatedRetryException: Retry terminated abnormally by interceptor before first attempt
请参阅 RetryListener.open() 的 JavaDocs。
<T,E extends Throwable> boolean open(RetryContext context,
RetryCallback<T,E> callback)
Called before the first attempt in a retry. For instance, implementers can set up state that is needed by the policies in the RetryOperations. The whole retry can be vetoed by returning false from this method, in which case a TerminatedRetryException will be thrown.
Type Parameters:
T - the type of object returned by the callback
E - the type of exception it declares may be thrown
Parameters:
context - the current RetryContext.
callback - the current RetryCallback.
Returns:
true if the retry should proceed.
您需要 return 正确而不是错误。