使用 Spring 将 AMQP 消息解码为 Map
Decode AMQP message as a Map using Spring
我必须使用 Spring 解码 AMQP 消息。为了处理它,我现在使用:
// Configure queue.
RabbitAdmin admin = new RabbitAdmin(cf);
Queue queue = new Queue(queueName);
admin.declareQueue(queue);
FanoutExchange exchange = new FanoutExchange(exchangeName);
admin.declareExchange(exchange);
admin.declareBinding(BindingBuilder.bind(queue).to(exchange));
// set up the listener and container
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
MessageListenerAdapter adapter = new MessageListenerAdapter(listener);
container.setMessageListener(adapter);
container.setQueueNames(queueName);
container.start();
我的听众是
public class DataListener {
public void handleMessage(Object incomingMessage) {
LOGGER.error("AMQP: got message.{}", incomingMessage);
}
}
使用AmqpTemplate的convertAndSend方法发送消息。 AmqpTemplate 没有配置,一切都是默认的。
我如何才能将我的 incomingMessage 作为字段的 HashMap 接收?我不想将它与特定对象类型强耦合。
假设您的意思是您的消息是一个 POJO bean...
使用 JSON - 在出站端使用 Jackson2JsonMessageConverter
而不是默认的 SimpleMessageConverter
,后者使用 Java 序列化。
在接收端,同一个 JSON 转换器将尝试将传入流转换为原始 POJO。
为避免这种情况,请配置 JSON 消息转换器以将 class 名称映射到 HashMap 而不是原始 POJO。
您可以通过为转换器提供自定义 DefaultJackson2JavaTypeMapper
来做到这一点,该自定义 DefaultJackson2JavaTypeMapper
配置为将 class 名称从 __TypeId__
header 映射到 java.util.HashMap
.
编辑
或者你可以简单地注入一个总是 returns HashMap
的 ClassMapper
- 这是我写的一个快速启动应用程序来说明该技术:
@SpringBootApplication
public class So36837736Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So36837736Application.class, args);
context.getBean(RabbitTemplate.class).convertAndSend(new Foo("bar"));
Thread.sleep(10000);
context.close();
}
@Bean
public RabbitTemplate template(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setRoutingKey(queue().getName());
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue());
MessageListenerAdapter adapter = new MessageListenerAdapter(new Object() {
@SuppressWarnings("unused")
public void handleMessage(Map<String, Object> map) {
System.out.println("\n\n\n" + map + "\n\n\n");
}
});
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
ClassMapper classMapper = new ClassMapper() {
@Override
public void fromClass(Class<?> clazz, MessageProperties properties) {
}
@Override
public Class<?> toClass(MessageProperties properties) {
return HashMap.class;
}
};
messageConverter.setClassMapper(classMapper);
adapter.setMessageConverter(messageConverter);
container.setMessageListener(adapter);
return container;
}
@Bean
public Queue queue() {
return new AnonymousQueue();
}
public static class Foo {
private final String bar;
private Foo(String bar) {
this.bar = bar;
}
public String getBar() {
return this.bar;
}
}
}
我必须使用 Spring 解码 AMQP 消息。为了处理它,我现在使用:
// Configure queue.
RabbitAdmin admin = new RabbitAdmin(cf);
Queue queue = new Queue(queueName);
admin.declareQueue(queue);
FanoutExchange exchange = new FanoutExchange(exchangeName);
admin.declareExchange(exchange);
admin.declareBinding(BindingBuilder.bind(queue).to(exchange));
// set up the listener and container
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
MessageListenerAdapter adapter = new MessageListenerAdapter(listener);
container.setMessageListener(adapter);
container.setQueueNames(queueName);
container.start();
我的听众是
public class DataListener {
public void handleMessage(Object incomingMessage) {
LOGGER.error("AMQP: got message.{}", incomingMessage);
}
}
使用AmqpTemplate的convertAndSend方法发送消息。 AmqpTemplate 没有配置,一切都是默认的。
我如何才能将我的 incomingMessage 作为字段的 HashMap 接收?我不想将它与特定对象类型强耦合。
假设您的意思是您的消息是一个 POJO bean...
使用 JSON - 在出站端使用 Jackson2JsonMessageConverter
而不是默认的 SimpleMessageConverter
,后者使用 Java 序列化。
在接收端,同一个 JSON 转换器将尝试将传入流转换为原始 POJO。
为避免这种情况,请配置 JSON 消息转换器以将 class 名称映射到 HashMap 而不是原始 POJO。
您可以通过为转换器提供自定义 DefaultJackson2JavaTypeMapper
来做到这一点,该自定义 DefaultJackson2JavaTypeMapper
配置为将 class 名称从 __TypeId__
header 映射到 java.util.HashMap
.
编辑
或者你可以简单地注入一个总是 returns HashMap
的 ClassMapper
- 这是我写的一个快速启动应用程序来说明该技术:
@SpringBootApplication
public class So36837736Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So36837736Application.class, args);
context.getBean(RabbitTemplate.class).convertAndSend(new Foo("bar"));
Thread.sleep(10000);
context.close();
}
@Bean
public RabbitTemplate template(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setRoutingKey(queue().getName());
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue());
MessageListenerAdapter adapter = new MessageListenerAdapter(new Object() {
@SuppressWarnings("unused")
public void handleMessage(Map<String, Object> map) {
System.out.println("\n\n\n" + map + "\n\n\n");
}
});
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
ClassMapper classMapper = new ClassMapper() {
@Override
public void fromClass(Class<?> clazz, MessageProperties properties) {
}
@Override
public Class<?> toClass(MessageProperties properties) {
return HashMap.class;
}
};
messageConverter.setClassMapper(classMapper);
adapter.setMessageConverter(messageConverter);
container.setMessageListener(adapter);
return container;
}
@Bean
public Queue queue() {
return new AnonymousQueue();
}
public static class Foo {
private final String bar;
private Foo(String bar) {
this.bar = bar;
}
public String getBar() {
return this.bar;
}
}
}