Spring AMQP RPC 消费者并抛出异常

Spring AMQP RPC consumer and throw exception

我有一个处于 RPC 模式的消费者 (RabbitListner),我想知道是否可以抛出可以由发布者处理的异常。

为了让我的解释更清楚,案例如下:

我尝试使用 AmqpRejectAndDontRequeueException,但我的发布者没有收到异常,只是一个空的响应。

是否有可能完成,或者这样实施可能不是一个好的做法?

编辑 1:

@GaryRussel 回复后我的问题得到了解决:

  1. 我为 RabbitListner 创建了一个错误处理程序:

     @Configuration
     public class RabbitErrorHandler implements         RabbitListenerErrorHandler {
     @Override public Object handleError(Message message, org.springframework.messaging.Message<?> message1, ListenerExecutionFailedException e) {
    throw e;
    }
    

    }

  2. 将bean定义到配置文件中:

    @配置 public class RabbitConfig 扩展 RabbitConfiguration {

    @Bean
    public RabbitTemplate getRabbitTemplate() {
        Message.addWhiteListPatterns(RabbitConstants.CLASSES_TO_SEND_OVER_RABBITMQ);
    return new RabbitTemplate(this.connectionFactory());
    

    }

    /**
    * Define the RabbitErrorHandle
    * @return Initialize RabbitErrorHandle bean
    */
    @Bean
    public RabbitErrorHandler rabbitErrorHandler() {
       return new RabbitErrorHandler();
    }
    }
    
  3. 使用参数创建 @RabbitListner,其中 rabbitErrorHandler 是我之前定义的 bean:

    @Override
    @RabbitListener(queues = "${rabbit.queue}"
        , errorHandler = "rabbitErrorHandler"
        , returnExceptions = "true")
     public ReturnObject receiveMessage(Message message) {
    
  4. 对于 RabbitTemplate,我设置了这个属性:

       rabbitTemplate.setMessageConverter(new RemoteInvocationAwareMessageConverterAdapter());
    

当消息被消费者威胁,但它发送了一个错误,我得到一个包含原始异常的RemoteInvocationResult到e.getCause().getCause().

您必须 return 一条消息作为错误,消费应用程序可以选择将其视为异常。但是,我认为正常的异常处理流程不适用于消息传递。您的发布应用程序(RPC 服务的使用者)需要知道什么地方会出错,并进行编程以处理这些可能性。

参见 returnExceptions 属性 关于 @RabbitListener(自 2.0 起)。 Docs here.

The returnExceptions attribute, when true will cause exceptions to be returned to the sender. The exception is wrapped in a RemoteInvocationResult object.

On the sender side, there is an available RemoteInvocationAwareMessageConverterAdapter which, if configured into the RabbitTemplate, will re-throw the server-side exception, wrapped in an AmqpRemoteException. The stack trace of the server exception will be synthesized by merging the server and client stack traces.

Important

This mechanism will generally only work with the default SimpleMessageConverter, which uses Java serialization; exceptions are generally not "Jackson-friendly" so can’t be serialized to JSON. If you are using JSON, consider using an errorHandler to return some other Jackson-friendly Error object when an exception is thrown.

对我有用的是:

在“服务”方面:

  • 服务

    @RabbitListener(id = "test1", containerFactory ="BEAN CONTAINER FACTORY", 
    queues = "TEST QUEUE", returnExceptions = "true")
    DataList getData() {
      // this exception will be transformed by rabbit error handler to a RemoteInvocationResult
     throw new IllegalStateException("mon expecion");
     //return dataHelper.loadAllData();
    }
    

在“请求”方面:

  • 服务

    public void fetchData() throws AmqpRemoteException {
      var response = (DataList) amqpTemplate.convertSendAndReceive("TEST EXCHANGE", "ROUTING NAME", new Object());
    
      Optional.ofNullable(response)
             .ifPresentOrElse(this::setDataContent, this::handleNoData);
    }
    
  • 配置

    @Bean
    AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
       var rabbitTemplate = new RabbitTemplate(connectionFactory);
       rabbitTemplate.setMessageConverter(messageConverter);
       return rabbitTemplate;
    }
    @Bean
    MessageConverter jsonMessageConverter() {
       ObjectMapper objectMapper = new ObjectMapper();
       objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
       objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
       objectMapper.registerModule(new JavaTimeModule());
    
       var jsonConverter = new Jackson2JsonMessageConverter(objectMapper);
    
       DefaultClassMapper classMapper = new DefaultClassMapper();
       Map<String, Class<?>> idClassMapping = Map.of(
          DataList.class.getName(), DataList.class,
          RemoteInvocationResult.class.getName(), RemoteInvocationResult.class
       );
       classMapper.setIdClassMapping(idClassMapping);
    
       jsonConverter.setClassMapper(classMapper);
    
       // json converter with returned exception awareness
       // this will transform RemoteInvocationResult into a AmqpRemoteException 
       return new RemoteInvocationAwareMessageConverterAdapter(jsonConverter);
    }