使用 Spring 将 AMQP 消息解码为 Map

Decode AMQP message as a Map using Spring

我必须使用 Spring 解码 AMQP 消息。为了处理它,我现在使用:

    // Configure queue. 
    RabbitAdmin admin = new RabbitAdmin(cf);
    Queue queue = new Queue(queueName);
    admin.declareQueue(queue);
    FanoutExchange exchange = new FanoutExchange(exchangeName);
    admin.declareExchange(exchange);
    admin.declareBinding(BindingBuilder.bind(queue).to(exchange));  

    // set up the listener and container
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);

    MessageListenerAdapter adapter = new MessageListenerAdapter(listener);
    container.setMessageListener(adapter);
    container.setQueueNames(queueName);
    container.start();

我的听众是

public class DataListener {

    public void handleMessage(Object incomingMessage) {
        LOGGER.error("AMQP: got message.{}", incomingMessage);
    }   

}

使用AmqpTemplate的convertAndSend方法发送消息。 AmqpTemplate 没有配置,一切都是默认的。

我如何才能将我的 incomingMessage 作为字段的 HashMap 接收?我不想将它与特定对象类型强耦合。

假设您的意思是您的消息是一个 POJO bean...

使用 JSON - 在出站端使用 Jackson2JsonMessageConverter 而不是默认的 SimpleMessageConverter,后者使用 Java 序列化。

在接收端,同一个 JSON 转换器将尝试将传入流转换为原始 POJO。

为避免这种情况,请配置 JSON 消息转换器以将 class 名称映射到 HashMap 而不是原始 POJO。

您可以通过为转换器提供自定义 DefaultJackson2JavaTypeMapper 来做到这一点,该自定义 DefaultJackson2JavaTypeMapper 配置为将 class 名称从 __TypeId__ header 映射到 java.util.HashMap.

编辑

或者你可以简单地注入一个总是 returns HashMapClassMapper - 这是我写的一个快速启动应用程序来说明该技术:

@SpringBootApplication
public class So36837736Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So36837736Application.class, args);
        context.getBean(RabbitTemplate.class).convertAndSend(new Foo("bar"));
        Thread.sleep(10000);
        context.close();
    }

    @Bean
    public RabbitTemplate template(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        rabbitTemplate.setRoutingKey(queue().getName());
        return rabbitTemplate;
    }

    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(queue());
        MessageListenerAdapter adapter = new MessageListenerAdapter(new Object() {

            @SuppressWarnings("unused")
            public void handleMessage(Map<String, Object> map) {
                System.out.println("\n\n\n" + map + "\n\n\n");
            }

        });
        Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
        ClassMapper classMapper = new ClassMapper() {

            @Override
            public void fromClass(Class<?> clazz, MessageProperties properties) {
            }

            @Override
            public Class<?> toClass(MessageProperties properties) {
                return HashMap.class;
            }

        };
        messageConverter.setClassMapper(classMapper);
        adapter.setMessageConverter(messageConverter);
        container.setMessageListener(adapter);
        return container;
    }

    @Bean
    public Queue queue() {
        return new AnonymousQueue();
    }

    public static class Foo {

        private final String bar;

        private Foo(String bar) {
            this.bar = bar;
        }

        public String getBar() {
            return this.bar;
        }

    }

}