Spring AMQP 恢复器不能与 SimpleMessageListenerContainer 一起工作
Spring AMQP recoverer not working with SimpleMessageListenerContainer
我正在尝试在 spring amqp 中实现恢复。我使用下面的代码来实现相同的
RetryOperationsInterceptor retryInterceptorBuilder =RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.recoverer(new CustomRejectAndDontRequeueRecoverer())
.build();
container.setAdviceChain(new RetryOperationsInterceptor[]{retryInterceptorBuilder});
上述容器是SimpleMessageListenerContainer
的实例。现在我在我的一个接收器中投掷 ClassCastException
.
public class CustomRejectAndDontRequeueRecoverer implements MessageRecoverer {
private static Logger logger = //created some logger instance.
//Overriding the method to do custom task when the retries are exhausted, like insert in database.
@Override
public void recover(Message message, Throwable cause) {
logger.error("The recovery method is called","","");
throw new RuntimeException(cause);
}
}
请指出正确的方向。
更新:
我的 CustomMessagingPostProcessor 中抛出了一些异常。如果在 onMessage() 方法中抛出异常,我的 RetryOperationsInterceptor 只会重试消息。添加 CustomMessagingPostProcessor 的定义:-
public class MTMessagingPostProcessor implements MessagePostProcessor{
/**
* {@inheritDoc}
*/
@Override
public Message postProcessMessage(Message message) {
logger.xdebug("Inside MTMessagingPostProcessor",
//Throwing exception to show that some exception can be thrown in original code and I want to requeue messages to come here for n number of times.
throw new RuntimeException("TEST");
//return message;
}
public void setTenantProvider(TenantProvider tenantProvider) {
this.tenantProvider = tenantProvider;
}
}
如果在 MTMessagingPostProcessor 中抛出异常,我想将消息重新排队 n 次,这不是由拦截器实现的,因为它会在侦听器的 onMessage() 方法中重试消息。
如果您的意思是您在 afterReceiveMessagePostProcessors
容器 属性 中提供了一个 MessagePostProcessor
,那些 post 处理器在建议链的范围之外被调用(重试拦截器).
您不能 "retry" 此类 post 处理器抛出异常。
重试拦截器仅适用于侦听器本身。
我正在尝试在 spring amqp 中实现恢复。我使用下面的代码来实现相同的
RetryOperationsInterceptor retryInterceptorBuilder =RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.recoverer(new CustomRejectAndDontRequeueRecoverer())
.build();
container.setAdviceChain(new RetryOperationsInterceptor[]{retryInterceptorBuilder});
上述容器是SimpleMessageListenerContainer
的实例。现在我在我的一个接收器中投掷 ClassCastException
.
public class CustomRejectAndDontRequeueRecoverer implements MessageRecoverer {
private static Logger logger = //created some logger instance.
//Overriding the method to do custom task when the retries are exhausted, like insert in database.
@Override
public void recover(Message message, Throwable cause) {
logger.error("The recovery method is called","","");
throw new RuntimeException(cause);
}
}
请指出正确的方向。
更新:
我的 CustomMessagingPostProcessor 中抛出了一些异常。如果在 onMessage() 方法中抛出异常,我的 RetryOperationsInterceptor 只会重试消息。添加 CustomMessagingPostProcessor 的定义:-
public class MTMessagingPostProcessor implements MessagePostProcessor{
/**
* {@inheritDoc}
*/
@Override
public Message postProcessMessage(Message message) {
logger.xdebug("Inside MTMessagingPostProcessor",
//Throwing exception to show that some exception can be thrown in original code and I want to requeue messages to come here for n number of times.
throw new RuntimeException("TEST");
//return message;
}
public void setTenantProvider(TenantProvider tenantProvider) {
this.tenantProvider = tenantProvider;
}
}
如果在 MTMessagingPostProcessor 中抛出异常,我想将消息重新排队 n 次,这不是由拦截器实现的,因为它会在侦听器的 onMessage() 方法中重试消息。
如果您的意思是您在 afterReceiveMessagePostProcessors
容器 属性 中提供了一个 MessagePostProcessor
,那些 post 处理器在建议链的范围之外被调用(重试拦截器).
您不能 "retry" 此类 post 处理器抛出异常。
重试拦截器仅适用于侦听器本身。