Spring 注解@Retryable - 如何设置拦截器
Spring annotation @Retryable - how to set an interceptor
我正在 @Service
class
中的方法上使用 @Retryable
注释
@Service
@EnableRetry
public class PushService {
@Retryable(maxAttempts=5)
public Result pushIt(myMessage messageIn) {
...
}
}
它的工作原理非常棒:我直接从 RabbitMQ 收到一条消息,直到没有错误或尝试次数达到 5 时才会确认,那时消息会直接发送到DLQ,如我所愿
我唯一的问题是我需要从 属性 文件中动态设置 maxAttempts。解决方案应该是设置一个拦截器,但唯一会导致错误的事实是,例如当我有:
@Service
@EnableRetry
public class PushService {
@Retryable(interceptor="myInterceptor")
public Result pushIt(myMessage messageIn) {
...
}
}
其中 myInterceptor 定义为:
@Bean
public StatefulRetryOperationsInterceptor myInterceptor() {
return RetryInterceptorBuilder.stateful().maxAttempts(5).build();
}
我得到一个无限循环,但出现以下异常:
2015-04-08 07:12:10,970 GMT [SimpleAsyncTaskExecutor-1] (ConditionalRejectingErrorHandler.java:67) WARN listener.ConditionalRejectingErrorHandler: Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:864)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:802)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:690)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access[=14=]1(SimpleMessageListenerContainer.java:82)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:167)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1241)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1005)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:989)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access0(SimpleMessageListenerContainer.java:82)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1103)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
at org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean.getKey(StatefulRetryOperationsInterceptorFactoryBean.java:103)
at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor.invoke(StatefulRetryOperationsInterceptor.java:132)
at org.springframework.retry.annotation.AnnotationAwareRetryOperationsInterceptor.invoke(AnnotationAwareRetryOperationsInterceptor.java:118)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:653)
at com.acme.push.service.PushService$$EnhancerBySpringCGLIB$d503bc1.pushMessage(<generated>)
at com.acme.push.receiver.PushListener.onMessage(PushListener.java:42)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:799)
... 10 more
我很确定我把它保持得太简单了,但我不知道什么可能导致这个错误以及如何解决它,有人知道发生了什么吗?
org.springframework.amqp.rabbit.config.RetryInterceptorBuilder
的目的是不使用注释 @Retryable
。
其目的是与SimpleRabbitListenerContainerFactory
class的建议链一起使用。
请参阅参考文档 receiving-messages 和他 SimpleMessageListenerContainer#invokeListener
签名:
@Override
protected void invokeListener(Channel channel, Message message) throws Exception {
proxy.invokeListener(channel, message);
}
一旦您使用适当的 RetryInterceptor
配置建议链,您的注释将毫无用处。
我终于在不使用 @Retrayable
注释的情况下获得了所需的灵活性。
我用我的延迟参数和最大尝试次数创建了一个 RetryAdvice
:
@Bean
public MethodInterceptor retryAdvice() {
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(delay);
return RetryInterceptorBuilder.stateless().backOffPolicy(backOffPolicy)
.maxAttempts(maxAttempts).build();
}
并且我在 ListenerContainer
的 adviceChain 中插入了 Advice
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(pushConnectionFactory());
container.setQueues(pushQueue());
container.setMessageListener(pushListener());
Advice[] adviceChain = new Advice[] { retryAdvice() };
container.setAdviceChain(adviceChain);
return container;
}
这样,每当我的 Listener 将抛出
throw new AmqpRejectAndDontRequeueException(cause);
这将导致容器以所需的延迟重试指定的次数,之后将传播异常并在 DLQ
中传递消息
我正在 @Service
class
@Retryable
注释
@Service
@EnableRetry
public class PushService {
@Retryable(maxAttempts=5)
public Result pushIt(myMessage messageIn) {
...
}
}
它的工作原理非常棒:我直接从 RabbitMQ 收到一条消息,直到没有错误或尝试次数达到 5 时才会确认,那时消息会直接发送到DLQ,如我所愿
我唯一的问题是我需要从 属性 文件中动态设置 maxAttempts。解决方案应该是设置一个拦截器,但唯一会导致错误的事实是,例如当我有:
@Service
@EnableRetry
public class PushService {
@Retryable(interceptor="myInterceptor")
public Result pushIt(myMessage messageIn) {
...
}
}
其中 myInterceptor 定义为:
@Bean
public StatefulRetryOperationsInterceptor myInterceptor() {
return RetryInterceptorBuilder.stateful().maxAttempts(5).build();
}
我得到一个无限循环,但出现以下异常:
2015-04-08 07:12:10,970 GMT [SimpleAsyncTaskExecutor-1] (ConditionalRejectingErrorHandler.java:67) WARN listener.ConditionalRejectingErrorHandler: Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:864)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:802)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:690)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access[=14=]1(SimpleMessageListenerContainer.java:82)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:167)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1241)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1005)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:989)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access0(SimpleMessageListenerContainer.java:82)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1103)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
at org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean.getKey(StatefulRetryOperationsInterceptorFactoryBean.java:103)
at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor.invoke(StatefulRetryOperationsInterceptor.java:132)
at org.springframework.retry.annotation.AnnotationAwareRetryOperationsInterceptor.invoke(AnnotationAwareRetryOperationsInterceptor.java:118)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:653)
at com.acme.push.service.PushService$$EnhancerBySpringCGLIB$d503bc1.pushMessage(<generated>)
at com.acme.push.receiver.PushListener.onMessage(PushListener.java:42)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:799)
... 10 more
我很确定我把它保持得太简单了,但我不知道什么可能导致这个错误以及如何解决它,有人知道发生了什么吗?
org.springframework.amqp.rabbit.config.RetryInterceptorBuilder
的目的是不使用注释 @Retryable
。
其目的是与SimpleRabbitListenerContainerFactory
class的建议链一起使用。
请参阅参考文档 receiving-messages 和他 SimpleMessageListenerContainer#invokeListener
签名:
@Override
protected void invokeListener(Channel channel, Message message) throws Exception {
proxy.invokeListener(channel, message);
}
一旦您使用适当的 RetryInterceptor
配置建议链,您的注释将毫无用处。
我终于在不使用 @Retrayable
注释的情况下获得了所需的灵活性。
我用我的延迟参数和最大尝试次数创建了一个 RetryAdvice
:
@Bean
public MethodInterceptor retryAdvice() {
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(delay);
return RetryInterceptorBuilder.stateless().backOffPolicy(backOffPolicy)
.maxAttempts(maxAttempts).build();
}
并且我在 ListenerContainer
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(pushConnectionFactory());
container.setQueues(pushQueue());
container.setMessageListener(pushListener());
Advice[] adviceChain = new Advice[] { retryAdvice() };
container.setAdviceChain(adviceChain);
return container;
}
这样,每当我的 Listener 将抛出
throw new AmqpRejectAndDontRequeueException(cause);
这将导致容器以所需的延迟重试指定的次数,之后将传播异常并在 DLQ
中传递消息