默认为 content_type application/json 并从 DefaultExceptionStrategy 覆盖 isFatal
Default to content_type application/json with overriden isFatal from DefaultExceptionStrategy
我不想要求我的客户提供 content_type application/json,而是默认提供。我得到了这个工作。
我还尝试结合另一个示例从 ConditionalRejectingErrorHandler 实现自定义 isFatal(Throwable t)。我可以触发我的自定义错误处理程序,但似乎又需要 content_type 属性。我不知道如何让他们同时工作。
有什么想法吗?
以下成功地不需要 content_type
编辑:下面的代码并不像我想的那样工作。队列中带有 属性 content_type application/json 的旧消息必须已被拉入
@EnableRabbit
@Configuration
public class ExampleRabbitConfigurer implements
RabbitListenerConfigurer {
@Value("${spring.rabbitmq.host:'localhost'}")
private String host;
@Value("${spring.rabbitmq.port:5672}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Autowired
private MappingJackson2MessageConverter mappingJackson2MessageConverter;
@Autowired
private DefaultMessageHandlerMethodFactory messageHandlerMethodFactory;
@Bean
public MappingJackson2MessageConverter mappingJackson2MessageConverter() {
return new MappingJackson2MessageConverter();
}
@Bean
public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(mappingJackson2MessageConverter);
return factory;
}
@Override
public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory);
}
下面的代码用于覆盖 ConditionalRejectingErrorHandler 中的 isFatal()。 SimpleRabbitListenerContainerFactory.setMessageConverter() 看起来应该与 DefaultMessageHandlerMethodFactory.setMessageConverter() 具有相同的目的。显然情况并非如此。
@EnableRabbit
@Configuration
public class ExampleRabbitConfigurer {
@Value("${spring.rabbitmq.host:'localhost'}")
private String host;
@Value("${spring.rabbitmq.port:5672}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Autowired
ConnectionFactory connectionFactory;
@Autowired
Jackson2JsonMessageConverter jackson2JsonConverter;
@Autowired
ErrorHandler amqpErrorHandlingExceptionStrategy;
@Bean
public Jackson2JsonMessageConverter jackson2JsonConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public ErrorHandler amqpErrorHandlingExceptionStrategy() {
return new ConditionalRejectingErrorHandler(new AmqpErrorHandlingExceptionStrategy());
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jackson2JsonConverter);
factory.setErrorHandler(amqpErrorHandlingExceptionStrategy);
return factory;
}
public static class AmqpErrorHandlingExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
private final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(getClass());
@Override
public boolean isFatal(Throwable t) {
if (t instanceof ListenerExecutionFailedException) {
ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
LOGGER.error("Failed to process inbound message from queue "
+ lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
+ "; failed message: " + lefe.getFailedMessage(), t);
}
return super.isFatal(t);
}
}
使用 "after receive" MessagePostProcessor
将 contentType
header 添加到入站消息中。
从 2.0 版本开始,您可以将 MPP 添加到容器工厂。
对于早期版本,您可以重新配置...
@SpringBootApplication
public class So47424449Application {
public static void main(String[] args) {
SpringApplication.run(So47424449Application.class, args);
}
@Bean
public ApplicationRunner runner(RabbitListenerEndpointRegistry registry, RabbitTemplate template) {
return args -> {
SimpleMessageListenerContainer container =
(SimpleMessageListenerContainer) registry.getListenerContainer("myListener");
container.setAfterReceivePostProcessors(m -> {
m.getMessageProperties().setContentType("application/json");
return m;
});
container.start();
// send a message with no content type
template.setMessageConverter(new SimpleMessageConverter());
template.convertAndSend("foo", "{\"bar\":\"baz\"}", m -> {
m.getMessageProperties().setContentType(null);
return m;
});
template.convertAndSend("foo", "{\"bar\":\"qux\"}", m -> {
m.getMessageProperties().setContentType(null);
return m;
});
};
}
@Bean
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
@RabbitListener(id = "myListener", queues = "foo", autoStartup = "false")
public void listen(Foo foo) {
System.out.println(foo);
if (foo.bar.equals("qux")) {
throw new MessageConversionException("test");
}
}
public static class Foo {
public String bar;
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
如您所见,由于它修改了源消息,修改后的 header 在错误处理程序中可用...
2017-11-22 09:39:26.615 WARN 97368 --- [cTaskExecutor-1] ingErrorHandler$DefaultExceptionStrategy : Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'{"bar":"qux"}' MessageProperties [headers={}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=foo, deliveryTag=2, consumerTag=amq.ctag-re1kcxKV14L_nl186stM0w, consumerQueue=foo]), contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=foo, deliveryTag=2, consumerTag=amq.ctag-re1kcxKV14L_nl186stM0w, consumerQueue=foo])
我不想要求我的客户提供 content_type application/json,而是默认提供。我得到了这个工作。
我还尝试结合另一个示例从 ConditionalRejectingErrorHandler 实现自定义 isFatal(Throwable t)。我可以触发我的自定义错误处理程序,但似乎又需要 content_type 属性。我不知道如何让他们同时工作。
有什么想法吗?
以下成功地不需要 content_type 编辑:下面的代码并不像我想的那样工作。队列中带有 属性 content_type application/json 的旧消息必须已被拉入
@EnableRabbit
@Configuration
public class ExampleRabbitConfigurer implements
RabbitListenerConfigurer {
@Value("${spring.rabbitmq.host:'localhost'}")
private String host;
@Value("${spring.rabbitmq.port:5672}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Autowired
private MappingJackson2MessageConverter mappingJackson2MessageConverter;
@Autowired
private DefaultMessageHandlerMethodFactory messageHandlerMethodFactory;
@Bean
public MappingJackson2MessageConverter mappingJackson2MessageConverter() {
return new MappingJackson2MessageConverter();
}
@Bean
public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(mappingJackson2MessageConverter);
return factory;
}
@Override
public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory);
}
下面的代码用于覆盖 ConditionalRejectingErrorHandler 中的 isFatal()。 SimpleRabbitListenerContainerFactory.setMessageConverter() 看起来应该与 DefaultMessageHandlerMethodFactory.setMessageConverter() 具有相同的目的。显然情况并非如此。
@EnableRabbit
@Configuration
public class ExampleRabbitConfigurer {
@Value("${spring.rabbitmq.host:'localhost'}")
private String host;
@Value("${spring.rabbitmq.port:5672}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Autowired
ConnectionFactory connectionFactory;
@Autowired
Jackson2JsonMessageConverter jackson2JsonConverter;
@Autowired
ErrorHandler amqpErrorHandlingExceptionStrategy;
@Bean
public Jackson2JsonMessageConverter jackson2JsonConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public ErrorHandler amqpErrorHandlingExceptionStrategy() {
return new ConditionalRejectingErrorHandler(new AmqpErrorHandlingExceptionStrategy());
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jackson2JsonConverter);
factory.setErrorHandler(amqpErrorHandlingExceptionStrategy);
return factory;
}
public static class AmqpErrorHandlingExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
private final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(getClass());
@Override
public boolean isFatal(Throwable t) {
if (t instanceof ListenerExecutionFailedException) {
ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
LOGGER.error("Failed to process inbound message from queue "
+ lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
+ "; failed message: " + lefe.getFailedMessage(), t);
}
return super.isFatal(t);
}
}
使用 "after receive" MessagePostProcessor
将 contentType
header 添加到入站消息中。
从 2.0 版本开始,您可以将 MPP 添加到容器工厂。
对于早期版本,您可以重新配置...
@SpringBootApplication
public class So47424449Application {
public static void main(String[] args) {
SpringApplication.run(So47424449Application.class, args);
}
@Bean
public ApplicationRunner runner(RabbitListenerEndpointRegistry registry, RabbitTemplate template) {
return args -> {
SimpleMessageListenerContainer container =
(SimpleMessageListenerContainer) registry.getListenerContainer("myListener");
container.setAfterReceivePostProcessors(m -> {
m.getMessageProperties().setContentType("application/json");
return m;
});
container.start();
// send a message with no content type
template.setMessageConverter(new SimpleMessageConverter());
template.convertAndSend("foo", "{\"bar\":\"baz\"}", m -> {
m.getMessageProperties().setContentType(null);
return m;
});
template.convertAndSend("foo", "{\"bar\":\"qux\"}", m -> {
m.getMessageProperties().setContentType(null);
return m;
});
};
}
@Bean
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
@RabbitListener(id = "myListener", queues = "foo", autoStartup = "false")
public void listen(Foo foo) {
System.out.println(foo);
if (foo.bar.equals("qux")) {
throw new MessageConversionException("test");
}
}
public static class Foo {
public String bar;
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
如您所见,由于它修改了源消息,修改后的 header 在错误处理程序中可用...
2017-11-22 09:39:26.615 WARN 97368 --- [cTaskExecutor-1] ingErrorHandler$DefaultExceptionStrategy : Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'{"bar":"qux"}' MessageProperties [headers={}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=foo, deliveryTag=2, consumerTag=amq.ctag-re1kcxKV14L_nl186stM0w, consumerQueue=foo]), contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=foo, deliveryTag=2, consumerTag=amq.ctag-re1kcxKV14L_nl186stM0w, consumerQueue=foo])