spring boot rabbitmq MappingJackson2MessageConverter 自定义对象转换

spring boot rabbitmq MappingJackson2MessageConverter custom object conversion

我正在尝试创建一个简单的 spring 启动应用程序 spring 启动 "produce" 消息到 rabbitmq exchange/queue 和另一个示例 spring启动 "consume" 这些消息的应用程序。 所以我有两个应用程序(如果你愿意,也可以是微服务)。 1) "producer" 微服务 2) "consumer"微服务

"producer" 有 2 个域对象。 Foo 和 Bar 应转换为 json 并发送到 rabbitmq。 "consumer" 应该接收 json 消息并将其分别转换为域 Foo 和 Bar。 出于某种原因,我无法完成这个简单的任务。这方面的例子不多。 对于消息转换器,我想使用 org.springframework.messaging.converter.MappingJackson2MessageConverter

这是我目前的情况:

生产者微服务

package demo.producer;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.stereotype.Service;

@SpringBootApplication
public class ProducerApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }

    @Bean
    Queue queue() {
        return new Queue("queue", false);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("exchange");
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("queue");
    }

    @Bean
    public MappingJackson2MessageConverter jackson2Converter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        return converter;
    }

    @Autowired
    private Sender sender;

    @Override
    public void run(String... args) throws Exception {
        sender.sendToRabbitmq(new Foo(), new Bar());
    }
}

@Service
class Sender {

    @Autowired
    private RabbitMessagingTemplate rabbitMessagingTemplate;
    @Autowired
    private MappingJackson2MessageConverter mappingJackson2MessageConverter;

    public void sendToRabbitmq(final Foo foo, final Bar bar) {

        this.rabbitMessagingTemplate.setMessageConverter(this.mappingJackson2MessageConverter);

        this.rabbitMessagingTemplate.convertAndSend("exchange", "queue", foo);
        this.rabbitMessagingTemplate.convertAndSend("exchange", "queue", bar);

    }
}

class Bar {
    public int age = 33;
}

class Foo {
    public String name = "gustavo";
}

消费者微服务

package demo.consumer;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.stereotype.Service;

@SpringBootApplication
@EnableRabbit
public class ConsumerApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

    @Autowired
    private Receiver receiver;

    @Override
    public void run(String... args) throws Exception {

    }

}

@Service
class Receiver {
    @RabbitListener(queues = "queue")
    public void receiveMessage(Foo foo) {
        System.out.println("Received <" + foo.name + ">");
    }

    @RabbitListener(queues = "queue")
    public void receiveMessage(Bar bar) {
        System.out.println("Received <" + bar.age + ">");
    }
}

class Foo {
    public String name;
}

class Bar {
    public int age;
}

这是我遇到的异常:

    org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void demo.consumer.Receiver.receiveMessage(demo.consumer.Bar)]
Bean [demo.consumer.Receiver@1672fe87]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:116)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:93)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:756)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access[=13=]1(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:170)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1257)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1021)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1005)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access0(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1119)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Cannot handle message
    ... 13 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: No converter found to convert to class demo.consumer.Bar, message=GenericMessage [payload=byte[10], headers={amqp_receivedRoutingKey=queue, amqp_receivedExchange=exchange, amqp_deliveryTag=1, amqp_deliveryMode=PERSISTENT, amqp_consumerQueue=queue, amqp_redelivered=false, id=87cf7e06-a78a-ddc1-71f5-c55066b46b11, amqp_consumerTag=amq.ctag-msWSwB4bYGWVO2diWSAHlw, contentType=application/json;charset=UTF-8, timestamp=1433989934574}]
    at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:115)
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:77)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:127)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:100)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:113)
    ... 12 common frames omitted

异常说没有转换器,这是真的,我的问题是我不知道如何在消费者端设置 MappingJackson2MessageConverter 转换器(请注意我想使用 org.springframework.messaging.converter.MappingJackson2MessageConverter 而不是 org.springframework.amqp.support.converter.JsonMessageConverter)

有什么想法吗?

以防万一,您可以在以下位置创建示例项目: https://github.com/gustavoorsi/rabbitmq-consumer-receiver

我自己还没有这样做,但您似乎需要通过设置 RabbitTemplate 来注册适当的转换。查看 this Spring documentation. I know it is configured using the AMQP classes but if the messaging class you are mentioning is compatible there is no reason you can't substitute it. Looks like this reference 中的第 3.1.8 节解释了如何使用 Java 配置而不是 XML。我没有真正使用过 Rabbit,所以我没有任何个人经验,但我很想听听你的发现。

好的,我终于搞定了。

Spring使用一个PayloadArgumentResolver提取、转换并将转换后的消息设置为用@RabbitListener注释的方法参数.我们需要以某种方式将 mappingJackson2MessageConverter 设置到该对象中。

因此,在 CONSUMER 应用程序中,我们需要实现 RabbitListenerConfigurer。通过覆盖 configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) 我们可以设置自定义 DefaultMessageHandlerMethodFactory,我们设置消息转换器到这个工厂,工厂将创建我们的 PayloadArgumentResolver 正确转换。

这是代码片段,我还更新了 git project

ConsumerApplication.java

package demo.consumer;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.stereotype.Service;

@SpringBootApplication
@EnableRabbit
public class ConsumerApplication implements RabbitListenerConfigurer {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

    @Bean
    public MappingJackson2MessageConverter jackson2Converter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        return converter;
    }

    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(jackson2Converter());
        return factory;
    }

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
    }

    @Autowired
    private Receiver receiver;

}

@Service
class Receiver {
    @RabbitListener(queues = "queue")
    public void receiveMessage(Foo foo) {
        System.out.println("Received <" + foo.name + ">");
    }

    @RabbitListener(queues = "queue")
    public void receiveMessage(Bar bar) {
        System.out.println("Received <" + bar.age + ">");
    }
}

class Foo {
    public String name;
}

class Bar {
    public int age;
}

因此,如果您 运行 生产者微服务,它将在队列中添加 2 条消息。一个代表 Foo 对象,另一个代表 Bar 对象。 通过 运行 连接消费者微服务,您将看到两者都被 Receiver class.

中的相应方法使用

更新问题:

我认为我这边有一个关于排队的概念问题。通过声明 2 个用 @RabbitListener 注释的指向同一队列的方法,我无法实现我想要实现的目标。上述解决方案无法正常工作。如果您向 rabbitmq 发送 6 条 Foo 消息和 3 条 Bar 消息,它们将不会被带有 Foo 参数的侦听器接收 6 次。似乎侦听器是并行调用的,因此无法根据方法参数类型来区分调用哪个侦听器。 我的解决方案(我不确定这是否是最好的方法,我在这里接受建议)是为每个实体创建一个队列。 所以现在,我有 queue.barqueue.foo,并更新 @RabbitListener(queues = "queue.foo") 我再次更新了代码,您可以在我的 git repository.

中查看