Spring AMQP RPC 消费者并抛出异常
Spring AMQP RPC consumer and throw exception
我有一个处于 RPC 模式的消费者 (RabbitListner),我想知道是否可以抛出可以由发布者处理的异常。
为了让我的解释更清楚,案例如下:
- 发布者以RPC方式发送消息
- 消费者收到消息,检查消息的有效性,如果消息不能被计数,因为缺少参数,那么我想抛出异常。异常可以是特定的业务异常或特定的 AmqpException,但我希望发布者可以处理此异常,如果它没有进入超时。
我尝试使用 AmqpRejectAndDontRequeueException,但我的发布者没有收到异常,只是一个空的响应。
是否有可能完成,或者这样实施可能不是一个好的做法?
编辑 1:
@GaryRussel 回复后我的问题得到了解决:
我为 RabbitListner 创建了一个错误处理程序:
@Configuration
public class RabbitErrorHandler implements RabbitListenerErrorHandler {
@Override public Object handleError(Message message, org.springframework.messaging.Message<?> message1, ListenerExecutionFailedException e) {
throw e;
}
}
将bean定义到配置文件中:
@配置
public class RabbitConfig 扩展 RabbitConfiguration {
@Bean
public RabbitTemplate getRabbitTemplate() {
Message.addWhiteListPatterns(RabbitConstants.CLASSES_TO_SEND_OVER_RABBITMQ);
return new RabbitTemplate(this.connectionFactory());
}
/**
* Define the RabbitErrorHandle
* @return Initialize RabbitErrorHandle bean
*/
@Bean
public RabbitErrorHandler rabbitErrorHandler() {
return new RabbitErrorHandler();
}
}
使用参数创建 @RabbitListner,其中 rabbitErrorHandler 是我之前定义的 bean:
@Override
@RabbitListener(queues = "${rabbit.queue}"
, errorHandler = "rabbitErrorHandler"
, returnExceptions = "true")
public ReturnObject receiveMessage(Message message) {
对于 RabbitTemplate,我设置了这个属性:
rabbitTemplate.setMessageConverter(new RemoteInvocationAwareMessageConverterAdapter());
当消息被消费者威胁,但它发送了一个错误,我得到一个包含原始异常的RemoteInvocationResult到e.getCause().getCause().
您必须 return 一条消息作为错误,消费应用程序可以选择将其视为异常。但是,我认为正常的异常处理流程不适用于消息传递。您的发布应用程序(RPC 服务的使用者)需要知道什么地方会出错,并进行编程以处理这些可能性。
参见 returnExceptions
属性 关于 @RabbitListener
(自 2.0 起)。 Docs here.
The returnExceptions
attribute, when true
will cause exceptions to be returned to the sender. The exception is wrapped in a RemoteInvocationResult
object.
On the sender side, there is an available RemoteInvocationAwareMessageConverterAdapter
which, if configured into the RabbitTemplate
, will re-throw the server-side exception, wrapped in an AmqpRemoteException
. The stack trace of the server exception will be synthesized by merging the server and client stack traces.
Important
This mechanism will generally only work with the default SimpleMessageConverter, which uses Java serialization; exceptions are generally not "Jackson-friendly" so can’t be serialized to JSON. If you are using JSON, consider using an errorHandler to return some other Jackson-friendly Error object when an exception is thrown.
对我有用的是:
在“服务”方面:
服务
@RabbitListener(id = "test1", containerFactory ="BEAN CONTAINER FACTORY",
queues = "TEST QUEUE", returnExceptions = "true")
DataList getData() {
// this exception will be transformed by rabbit error handler to a RemoteInvocationResult
throw new IllegalStateException("mon expecion");
//return dataHelper.loadAllData();
}
在“请求”方面:
服务
public void fetchData() throws AmqpRemoteException {
var response = (DataList) amqpTemplate.convertSendAndReceive("TEST EXCHANGE", "ROUTING NAME", new Object());
Optional.ofNullable(response)
.ifPresentOrElse(this::setDataContent, this::handleNoData);
}
配置
@Bean
AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
var rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter);
return rabbitTemplate;
}
@Bean
MessageConverter jsonMessageConverter() {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
objectMapper.registerModule(new JavaTimeModule());
var jsonConverter = new Jackson2JsonMessageConverter(objectMapper);
DefaultClassMapper classMapper = new DefaultClassMapper();
Map<String, Class<?>> idClassMapping = Map.of(
DataList.class.getName(), DataList.class,
RemoteInvocationResult.class.getName(), RemoteInvocationResult.class
);
classMapper.setIdClassMapping(idClassMapping);
jsonConverter.setClassMapper(classMapper);
// json converter with returned exception awareness
// this will transform RemoteInvocationResult into a AmqpRemoteException
return new RemoteInvocationAwareMessageConverterAdapter(jsonConverter);
}
我有一个处于 RPC 模式的消费者 (RabbitListner),我想知道是否可以抛出可以由发布者处理的异常。
为了让我的解释更清楚,案例如下:
- 发布者以RPC方式发送消息
- 消费者收到消息,检查消息的有效性,如果消息不能被计数,因为缺少参数,那么我想抛出异常。异常可以是特定的业务异常或特定的 AmqpException,但我希望发布者可以处理此异常,如果它没有进入超时。
我尝试使用 AmqpRejectAndDontRequeueException,但我的发布者没有收到异常,只是一个空的响应。
是否有可能完成,或者这样实施可能不是一个好的做法?
编辑 1:
@GaryRussel 回复后我的问题得到了解决:
我为 RabbitListner 创建了一个错误处理程序:
@Configuration public class RabbitErrorHandler implements RabbitListenerErrorHandler { @Override public Object handleError(Message message, org.springframework.messaging.Message<?> message1, ListenerExecutionFailedException e) { throw e; }
}
将bean定义到配置文件中:
@配置 public class RabbitConfig 扩展 RabbitConfiguration {
@Bean public RabbitTemplate getRabbitTemplate() { Message.addWhiteListPatterns(RabbitConstants.CLASSES_TO_SEND_OVER_RABBITMQ); return new RabbitTemplate(this.connectionFactory());
}
/** * Define the RabbitErrorHandle * @return Initialize RabbitErrorHandle bean */ @Bean public RabbitErrorHandler rabbitErrorHandler() { return new RabbitErrorHandler(); } }
使用参数创建 @RabbitListner,其中 rabbitErrorHandler 是我之前定义的 bean:
@Override @RabbitListener(queues = "${rabbit.queue}" , errorHandler = "rabbitErrorHandler" , returnExceptions = "true") public ReturnObject receiveMessage(Message message) {
对于 RabbitTemplate,我设置了这个属性:
rabbitTemplate.setMessageConverter(new RemoteInvocationAwareMessageConverterAdapter());
当消息被消费者威胁,但它发送了一个错误,我得到一个包含原始异常的RemoteInvocationResult到e.getCause().getCause().
您必须 return 一条消息作为错误,消费应用程序可以选择将其视为异常。但是,我认为正常的异常处理流程不适用于消息传递。您的发布应用程序(RPC 服务的使用者)需要知道什么地方会出错,并进行编程以处理这些可能性。
参见 returnExceptions
属性 关于 @RabbitListener
(自 2.0 起)。 Docs here.
The
returnExceptions
attribute, whentrue
will cause exceptions to be returned to the sender. The exception is wrapped in aRemoteInvocationResult
object.On the sender side, there is an available
RemoteInvocationAwareMessageConverterAdapter
which, if configured into theRabbitTemplate
, will re-throw the server-side exception, wrapped in anAmqpRemoteException
. The stack trace of the server exception will be synthesized by merging the server and client stack traces.Important
This mechanism will generally only work with the default SimpleMessageConverter, which uses Java serialization; exceptions are generally not "Jackson-friendly" so can’t be serialized to JSON. If you are using JSON, consider using an errorHandler to return some other Jackson-friendly Error object when an exception is thrown.
对我有用的是:
在“服务”方面:
服务
@RabbitListener(id = "test1", containerFactory ="BEAN CONTAINER FACTORY", queues = "TEST QUEUE", returnExceptions = "true") DataList getData() { // this exception will be transformed by rabbit error handler to a RemoteInvocationResult throw new IllegalStateException("mon expecion"); //return dataHelper.loadAllData(); }
在“请求”方面:
服务
public void fetchData() throws AmqpRemoteException { var response = (DataList) amqpTemplate.convertSendAndReceive("TEST EXCHANGE", "ROUTING NAME", new Object()); Optional.ofNullable(response) .ifPresentOrElse(this::setDataContent, this::handleNoData); }
配置
@Bean AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) { var rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(messageConverter); return rabbitTemplate; } @Bean MessageConverter jsonMessageConverter() { ObjectMapper objectMapper = new ObjectMapper(); objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); objectMapper.registerModule(new JavaTimeModule()); var jsonConverter = new Jackson2JsonMessageConverter(objectMapper); DefaultClassMapper classMapper = new DefaultClassMapper(); Map<String, Class<?>> idClassMapping = Map.of( DataList.class.getName(), DataList.class, RemoteInvocationResult.class.getName(), RemoteInvocationResult.class ); classMapper.setIdClassMapping(idClassMapping); jsonConverter.setClassMapper(classMapper); // json converter with returned exception awareness // this will transform RemoteInvocationResult into a AmqpRemoteException return new RemoteInvocationAwareMessageConverterAdapter(jsonConverter); }