默认为 content_type application/json 并从 DefaultExceptionStrategy 覆盖 isFatal

Default to content_type application/json with overriden isFatal from DefaultExceptionStrategy

我不想要求我的客户提供 content_type application/json,而是默认提供。我得到了这个工作。

我还尝试结合另一个示例从 ConditionalRejectingErrorHandler 实现自定义 isFatal(Throwable t)。我可以触发我的自定义错误处理程序,但似乎又需要 content_type 属性。我不知道如何让他们同时工作。

有什么想法吗?

以下成功地不需要 content_type 编辑:下面的代码并不像我想的那样工作。队列中带有 属性 content_type application/json 的旧消息必须已被拉入

 @EnableRabbit
 @Configuration
 public class ExampleRabbitConfigurer implements 
 RabbitListenerConfigurer {

@Value("${spring.rabbitmq.host:'localhost'}")
private String host;

@Value("${spring.rabbitmq.port:5672}")
private int port;

@Value("${spring.rabbitmq.username}")
private String username;

@Value("${spring.rabbitmq.password}")
private String password;

@Autowired
private MappingJackson2MessageConverter mappingJackson2MessageConverter;

@Autowired
private DefaultMessageHandlerMethodFactory messageHandlerMethodFactory;

@Bean
public MappingJackson2MessageConverter mappingJackson2MessageConverter() {
   return new MappingJackson2MessageConverter();
}

@Bean
public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
   DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
   factory.setMessageConverter(mappingJackson2MessageConverter);
   return factory;
}

@Override
public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
   registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory);
}

下面的代码用于覆盖 ConditionalRejectingErrorHandler 中的 isFatal()。 SimpleRabbitListenerContainerFactory.setMessageConverter() 看起来应该与 DefaultMessageHandlerMethodFactory.setMessageConverter() 具有相同的目的。显然情况并非如此。

 @EnableRabbit
 @Configuration
 public class ExampleRabbitConfigurer {

@Value("${spring.rabbitmq.host:'localhost'}")
private String host;

@Value("${spring.rabbitmq.port:5672}")
private int port;

@Value("${spring.rabbitmq.username}")
private String username;

@Value("${spring.rabbitmq.password}")
private String password;

@Autowired
ConnectionFactory connectionFactory;

@Autowired
Jackson2JsonMessageConverter jackson2JsonConverter;

@Autowired
ErrorHandler amqpErrorHandlingExceptionStrategy;

@Bean
public Jackson2JsonMessageConverter jackson2JsonConverter() {
    return new Jackson2JsonMessageConverter();
}

@Bean
public ErrorHandler amqpErrorHandlingExceptionStrategy() {
    return new ConditionalRejectingErrorHandler(new AmqpErrorHandlingExceptionStrategy());
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(jackson2JsonConverter);
    factory.setErrorHandler(amqpErrorHandlingExceptionStrategy);
    return factory;
}


public static class AmqpErrorHandlingExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {

    private final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(getClass());

    @Override
    public boolean isFatal(Throwable t) {

        if (t instanceof ListenerExecutionFailedException) {
            ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
            LOGGER.error("Failed to process inbound message from queue "
                    + lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
                    + "; failed message: " + lefe.getFailedMessage(), t);
        }
        return super.isFatal(t);
    }

}

使用 "after receive" MessagePostProcessorcontentType header 添加到入站消息中。

从 2.0 版本开始,您可以将 MPP 添加到容器工厂。

对于早期版本,您可以重新配置...

@SpringBootApplication
public class So47424449Application {

    public static void main(String[] args) {
        SpringApplication.run(So47424449Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(RabbitListenerEndpointRegistry registry, RabbitTemplate template) {
        return args -> {
            SimpleMessageListenerContainer container =
                    (SimpleMessageListenerContainer) registry.getListenerContainer("myListener");
            container.setAfterReceivePostProcessors(m -> {
                m.getMessageProperties().setContentType("application/json");
                return m;
            });
            container.start();

            // send a message with no content type
            template.setMessageConverter(new SimpleMessageConverter());
            template.convertAndSend("foo", "{\"bar\":\"baz\"}", m -> {
                m.getMessageProperties().setContentType(null);
                return m;
            });
            template.convertAndSend("foo", "{\"bar\":\"qux\"}", m -> {
                m.getMessageProperties().setContentType(null);
                return m;
            });
        };
    }

    @Bean
    public Jackson2JsonMessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }

    @RabbitListener(id = "myListener", queues = "foo", autoStartup = "false")
    public void listen(Foo foo) {
        System.out.println(foo);
        if (foo.bar.equals("qux")) {
            throw new MessageConversionException("test");
        }
    }

    public static class Foo {

        public String bar;

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}

如您所见,由于它修改了源消息,修改后的 header 在错误处理程序中可用...

2017-11-22 09:39:26.615 WARN 97368 --- [cTaskExecutor-1] ingErrorHandler$DefaultExceptionStrategy : Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'{"bar":"qux"}' MessageProperties [headers={}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=foo, deliveryTag=2, consumerTag=amq.ctag-re1kcxKV14L_nl186stM0w, consumerQueue=foo]), contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=foo, deliveryTag=2, consumerTag=amq.ctag-re1kcxKV14L_nl186stM0w, consumerQueue=foo])