Spring AMQP - 发送和接收消息
Spring AMQP - Sender and Receiving Messages
我在接收来自 RabbitMQ 的消息时遇到问题。
我正在发送如下消息
HashMap<Object, Object> senderMap=new HashMap<>();
senderMap.put("STATUS", "SUCCESS");
senderMap.put("EXECUTION_START_TIME", new Date());
rabbitTemplate.convertAndSend(Constants.ADAPTOR_OP_QUEUE,senderMap);
如果我们在 RabbitMQ 中看到,我们将得到一个完全合格的类型。
在当前场景中,我们有 n 个生产者用于同一个消费者。如果我使用任何映射器,都会导致异常。
我将如何发送消息以使其不包含任何 type_id 并且我可以将消息作为 Message 对象接收,稍后我可以将其绑定到接收器中的自定义对象。
我收到如下消息。
您能否让我知道如何使用 Jackson2MessageConverter 以便消息将从接收端直接绑定到我的 Object/HashMap。另外,我现在已经从发件人那里删除了 Type_ID。
消息在 RabbitMQ 中的外观
priority: 0 delivery_mode: 2 headers:
ContentTypeId: java.lang.Object
KeyTypeId: java.lang.Object content_encoding: UTF-8 content_type: application/json
{"Execution_start_time":1473747183636,"status":"SUCCESS"}
@Component
public class AdapterOutputHandler {
private static Logger logger = Logger.getLogger(AdapterOutputHandler.class);
@RabbitListener(containerFactory="adapterOPListenerContainerFactory",queues=Constants.ADAPTOR_OP_QUEUE)
public void handleAdapterQueueMessage(HashMap<String,Object> message){
System.out.println("Receiver:::::::::::"+message.toString());
}
}
连接
@Bean(name="adapterOPListenerContainerFactory")
public SimpleRabbitListenerContainerFactory adapterOPListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
DefaultClassMapper classMapper = new DefaultClassMapper();
messageConverter.setClassMapper(classMapper);
factory.setMessageConverter(messageConverter);
}
异常
Caused by: org.springframework.amqp.support.converter.MessageConversionException: failed to convert Message content. Could not resolve __TypeId__ in header and no defaultType provided
at org.springframework.amqp.support.converter.DefaultClassMapper.toClass(DefaultClassMapper.java:139)
我不想使用来自发送方的 __TYPE__ID,因为它们是同一队列的多个发送方,而且只有一个消费者。
it leads to an exception
什么异常?
TypeId: com.diff.approach.JobListenerDTO
这意味着您发送的是 DTO,而不是您在问题中描述的哈希映射。
如果要删除 typeId header,可以使用消息 post 处理器...
rabbitTemplate.convertAndSend(Constants.INPUT_QUEUE, dto, m -> {
m.getMessageProperties.getHeaders().remove("__TypeId__");
return m;
});
(如果您不使用 Java 8,则为 , new MessagePostProcessor() {...}
)。
编辑
您使用的 Spring AMQP 是什么版本?使用 1.6,您甚至不必删除 __TypeId__
header - 框架会查看侦听器参数类型并告诉 Jackson 转换器该类型,以便它自动转换为该类型(如果可以的话)。正如你在这里看到的;它在不删除类型 id 的情况下工作正常...
package com.example;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class So39443850Application {
private static final String QUEUE = "so39443850";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So39443850Application.class, args);
context.getBean(RabbitTemplate.class).convertAndSend(QUEUE, new DTO("baz", "qux"));
context.getBean(So39443850Application.class).latch.await(10, TimeUnit.SECONDS);
context.getBean(RabbitAdmin.class).deleteQueue(QUEUE);
context.close();
}
private final CountDownLatch latch = new CountDownLatch(1);
@RabbitListener(queues = QUEUE, containerFactory = "adapterOPListenerContainerFactory")
public void listen(HashMap<String, Object> message) {
System.out.println(message.getClass() + ":" + message);
latch.countDown();
}
@Bean
public Queue queue() {
return new Queue(QUEUE);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
@Bean
public SimpleRabbitListenerContainerFactory adapterOPListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
public static class DTO {
private String foo;
private String baz;
public DTO(String foo, String baz) {
this.foo = foo;
this.baz = baz;
}
public String getFoo() {
return this.foo;
}
public void setFoo(String foo) {
this.foo = foo;
}
public String getBaz() {
return this.baz;
}
public void setBaz(String baz) {
this.baz = baz;
}
}
}
结果:
class java.util.HashMap:{foo=baz, baz=qux}
这在 the documentation...
中有描述
In versions prior to 1.6, the type information to convert the JSON had to be provided in message headers, or a custom ClassMapper was required. Starting with version 1.6, if there are no type information headers, the type can be inferred from the target method arguments.
您还可以将自定义 ClassMapper
配置为始终 return HashMap
。
想要在接收消息时使用 "a" 不同的 Java 调用?
使用自定义配置 @Bean Jackson2JsonMessageConverter ClassMapper
想在接收消息时使用"many" 不同的Java 调用?例如:
@MyAmqpMsgListener
void handlerMsg(
// Main message class, by MessageConverter
@Payload MyMsg myMsg,
// Secondary message class - by MessageConverter->ConversionService
@Payload Map<String, String> map,
org.springframework.messaging.Message<MyMsg> msg,
org.springframework.amqp.core.Message amqpMsg
) {
// ...
}
提供自定义@Bean Converter
, ConversionService
, RabbitListenerAnnotationBeanPostProcessor
:
@Bean
FormattingConversionServiceFactoryBean rabbitMqCs(
Set<Converter> converters
) {
FormattingConversionServiceFactoryBean fac = new FormattingConversionServiceFactoryBean();
fac.setConverters(converters);
return fac;
}
@Bean
DefaultMessageHandlerMethodFactory messageHandlerMethodFactory(
@Qualifier("rabbitMqCs")
FormattingConversionService rabbitMqCs
) {
DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
defaultFactory.setConversionService(rabbitMqCs);
return defaultFactory;
}
// copied from RabbitBootstrapConfiguration
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor(
MessageHandlerMethodFactory handlerFac
) {
RabbitListenerAnnotationBeanPostProcessor bpp = new RabbitListenerAnnotationBeanPostProcessor();
bpp.setMessageHandlerMethodFactory(handlerFac);
return bpp;
}
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}
参考文献:
我在接收来自 RabbitMQ 的消息时遇到问题。 我正在发送如下消息
HashMap<Object, Object> senderMap=new HashMap<>();
senderMap.put("STATUS", "SUCCESS");
senderMap.put("EXECUTION_START_TIME", new Date());
rabbitTemplate.convertAndSend(Constants.ADAPTOR_OP_QUEUE,senderMap);
如果我们在 RabbitMQ 中看到,我们将得到一个完全合格的类型。
在当前场景中,我们有 n 个生产者用于同一个消费者。如果我使用任何映射器,都会导致异常。 我将如何发送消息以使其不包含任何 type_id 并且我可以将消息作为 Message 对象接收,稍后我可以将其绑定到接收器中的自定义对象。
我收到如下消息。 您能否让我知道如何使用 Jackson2MessageConverter 以便消息将从接收端直接绑定到我的 Object/HashMap。另外,我现在已经从发件人那里删除了 Type_ID。
消息在 RabbitMQ 中的外观
priority: 0 delivery_mode: 2 headers:
ContentTypeId: java.lang.Object KeyTypeId: java.lang.Object content_encoding: UTF-8 content_type: application/json {"Execution_start_time":1473747183636,"status":"SUCCESS"}
@Component
public class AdapterOutputHandler {
private static Logger logger = Logger.getLogger(AdapterOutputHandler.class);
@RabbitListener(containerFactory="adapterOPListenerContainerFactory",queues=Constants.ADAPTOR_OP_QUEUE)
public void handleAdapterQueueMessage(HashMap<String,Object> message){
System.out.println("Receiver:::::::::::"+message.toString());
}
}
连接
@Bean(name="adapterOPListenerContainerFactory")
public SimpleRabbitListenerContainerFactory adapterOPListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
DefaultClassMapper classMapper = new DefaultClassMapper();
messageConverter.setClassMapper(classMapper);
factory.setMessageConverter(messageConverter);
}
异常
Caused by: org.springframework.amqp.support.converter.MessageConversionException: failed to convert Message content. Could not resolve __TypeId__ in header and no defaultType provided
at org.springframework.amqp.support.converter.DefaultClassMapper.toClass(DefaultClassMapper.java:139)
我不想使用来自发送方的 __TYPE__ID,因为它们是同一队列的多个发送方,而且只有一个消费者。
it leads to an exception
什么异常?
TypeId: com.diff.approach.JobListenerDTO
这意味着您发送的是 DTO,而不是您在问题中描述的哈希映射。
如果要删除 typeId header,可以使用消息 post 处理器...
rabbitTemplate.convertAndSend(Constants.INPUT_QUEUE, dto, m -> {
m.getMessageProperties.getHeaders().remove("__TypeId__");
return m;
});
(如果您不使用 Java 8,则为 , new MessagePostProcessor() {...}
)。
编辑
您使用的 Spring AMQP 是什么版本?使用 1.6,您甚至不必删除 __TypeId__
header - 框架会查看侦听器参数类型并告诉 Jackson 转换器该类型,以便它自动转换为该类型(如果可以的话)。正如你在这里看到的;它在不删除类型 id 的情况下工作正常...
package com.example;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class So39443850Application {
private static final String QUEUE = "so39443850";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So39443850Application.class, args);
context.getBean(RabbitTemplate.class).convertAndSend(QUEUE, new DTO("baz", "qux"));
context.getBean(So39443850Application.class).latch.await(10, TimeUnit.SECONDS);
context.getBean(RabbitAdmin.class).deleteQueue(QUEUE);
context.close();
}
private final CountDownLatch latch = new CountDownLatch(1);
@RabbitListener(queues = QUEUE, containerFactory = "adapterOPListenerContainerFactory")
public void listen(HashMap<String, Object> message) {
System.out.println(message.getClass() + ":" + message);
latch.countDown();
}
@Bean
public Queue queue() {
return new Queue(QUEUE);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
@Bean
public SimpleRabbitListenerContainerFactory adapterOPListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
public static class DTO {
private String foo;
private String baz;
public DTO(String foo, String baz) {
this.foo = foo;
this.baz = baz;
}
public String getFoo() {
return this.foo;
}
public void setFoo(String foo) {
this.foo = foo;
}
public String getBaz() {
return this.baz;
}
public void setBaz(String baz) {
this.baz = baz;
}
}
}
结果:
class java.util.HashMap:{foo=baz, baz=qux}
这在 the documentation...
中有描述In versions prior to 1.6, the type information to convert the JSON had to be provided in message headers, or a custom ClassMapper was required. Starting with version 1.6, if there are no type information headers, the type can be inferred from the target method arguments.
您还可以将自定义 ClassMapper
配置为始终 return HashMap
。
想要在接收消息时使用 "a" 不同的 Java 调用?
使用自定义配置 @Bean Jackson2JsonMessageConverter ClassMapper
想在接收消息时使用"many" 不同的Java 调用?例如:
@MyAmqpMsgListener void handlerMsg( // Main message class, by MessageConverter @Payload MyMsg myMsg, // Secondary message class - by MessageConverter->ConversionService @Payload Map<String, String> map, org.springframework.messaging.Message<MyMsg> msg, org.springframework.amqp.core.Message amqpMsg ) { // ... }
提供自定义@Bean
Converter
,ConversionService
,RabbitListenerAnnotationBeanPostProcessor
:@Bean FormattingConversionServiceFactoryBean rabbitMqCs( Set<Converter> converters ) { FormattingConversionServiceFactoryBean fac = new FormattingConversionServiceFactoryBean(); fac.setConverters(converters); return fac; } @Bean DefaultMessageHandlerMethodFactory messageHandlerMethodFactory( @Qualifier("rabbitMqCs") FormattingConversionService rabbitMqCs ) { DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory(); defaultFactory.setConversionService(rabbitMqCs); return defaultFactory; } // copied from RabbitBootstrapConfiguration @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor( MessageHandlerMethodFactory handlerFac ) { RabbitListenerAnnotationBeanPostProcessor bpp = new RabbitListenerAnnotationBeanPostProcessor(); bpp.setMessageHandlerMethodFactory(handlerFac); return bpp; } @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME) public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() { return new RabbitListenerEndpointRegistry(); }
参考文献: