如何在 Spring Boot 和 RabbitMQ 中配置 jSON 有效负载并将其接收并转换为域对象
How to configure and receiveAndConvert jSON payload into domain Object in Spring Boot and RabbitMQ
最近我对使用Spring Boot 的微服务架构产生了浓厚的兴趣。我的实现有两个 Spring 启动应用程序;
应用程序一 从 RESTful API 接收请求,转换并发送 jSON 有效负载到 RabbitMQ queueA 。
应用程序二,已订阅 queueA,接收 jSON 有效负载(域对象用户)并且应该在应用程序二中激活服务,例如。向用户发送电子邮件。
在我的 应用程序二 配置中不使用 XML,我如何配置一个转换器,将从 RabbitMQ 收到的 jSON 有效负载转换为域对象用户。
以下是 Spring 应用程序二
上的引导配置的片段
Application.class
@SpringBootApplication
@EnableRabbit
public class ApplicationInitializer implements CommandLineRunner {
final static String queueName = "user-registration";
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
AnnotationConfigApplicationContext context;
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
TopicExchange topicExchange() {
return new TopicExchange("user-registrations");
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(queueName);
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
public static void main(String[] args) {
SpringApplication.run(ApplicationInitializer.class, args);
}
@Override
public void run(String... args) throws Exception {
System.out.println("Waiting for messages...");
}
}
TestService.java
@Component
public class TestService {
/**
* This test verifies whether this consumer receives message off the user-registration queue
*/
@RabbitListener(queues = "user-registration")
public void testReceiveNewUserNotificationMessage(User user) {
// do something like, convert payload to domain object user and send email to this user
}
}
创建一个 jackson 消息转换器并使用 MessageListenerAdapter#setMessageConverter
进行设置
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
哪里来的MessageListenerAdapter
?
我遇到了同样的问题,经过一些研究和测试后我了解到,在 SpringBoot 中配置 RabbitMQ-Receiver 的方法不止一种,但选择一种并坚持使用很重要。
如果您决定使用 Annotation Driven Listener Endpoint,我从您对 @EnableRabbit 和 @RabbitListener 的使用中得出的结果比您发布的配置对我不起作用。以下是有效的:
从 org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer 派生您的配置 Class 并覆盖方法 configureRabbitListeners 如下:
@Override
public void configureRabbitListeners(
RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}
并添加一个 MessageHandlerFactory:
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(new MappingJackson2MessageConverter());
return factory;
}
此外,您需要定义 SimpleRabbitListenerContainerFactory(就像您已经做的那样)并自动装配相应的 ConnectionFactory:
@Autowired
public ConnectionFactory connectionFactory;
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
return factory;
}
完成你的配置,你需要定义Bean,它处理你的消息并继承@RabbitListerner-Annotations。对我而言,我将其命名为 EventResultHandler(您将其命名为 TestService):
@Bean
public EventResultHandler eventResultHandler() {
return new EventResultHandler();
}
然后在您的 EventResultHandler(或 TestService)中定义 @RabbitListener-Methods 及其相应的队列和有效负载(= POJO,其中您的 JSON-Message 被序列化为):
@Component
public class EventResultHandler {
@RabbitListener(queues=Queues.QUEUE_NAME_PRESENTATION_SERVICE)
public void handleMessage(@Payload Event event) {
System.out.println("Event received");
System.out.println("EventType: " + event.getType().getText());
}
}
我省略了队列和交换所需的定义和绑定——你可以在一个或另一个微服务中完成——或者在 RabbitMQ-Server 中手动完成……但你肯定必须这样做。
根据 Spring 引导版本 2.1.4.RELEASE,您可以执行以下操作:
- 使用 Jackson 消息转换器声明 "decorated" RabbitMq 模板:
@Bean
RabbitTemplate rabbitTemplate(RabbitTemplate rabbitTemplate) {
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate
}
- 从队列中读取消息:
var receievedValie = rabbitTemplate.receiveAndConvert("TestQueue", new ParameterizedTypeReference<Integer>() {
@Override
public Type getType() {
return super.getType();
}
})
最近我对使用Spring Boot 的微服务架构产生了浓厚的兴趣。我的实现有两个 Spring 启动应用程序;
应用程序一 从 RESTful API 接收请求,转换并发送 jSON 有效负载到 RabbitMQ queueA 。
应用程序二,已订阅 queueA,接收 jSON 有效负载(域对象用户)并且应该在应用程序二中激活服务,例如。向用户发送电子邮件。
在我的 应用程序二 配置中不使用 XML,我如何配置一个转换器,将从 RabbitMQ 收到的 jSON 有效负载转换为域对象用户。
以下是 Spring 应用程序二
上的引导配置的片段Application.class
@SpringBootApplication
@EnableRabbit
public class ApplicationInitializer implements CommandLineRunner {
final static String queueName = "user-registration";
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
AnnotationConfigApplicationContext context;
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
TopicExchange topicExchange() {
return new TopicExchange("user-registrations");
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(queueName);
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
public static void main(String[] args) {
SpringApplication.run(ApplicationInitializer.class, args);
}
@Override
public void run(String... args) throws Exception {
System.out.println("Waiting for messages...");
}
}
TestService.java
@Component
public class TestService {
/**
* This test verifies whether this consumer receives message off the user-registration queue
*/
@RabbitListener(queues = "user-registration")
public void testReceiveNewUserNotificationMessage(User user) {
// do something like, convert payload to domain object user and send email to this user
}
}
创建一个 jackson 消息转换器并使用 MessageListenerAdapter#setMessageConverter
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
哪里来的MessageListenerAdapter
?
我遇到了同样的问题,经过一些研究和测试后我了解到,在 SpringBoot 中配置 RabbitMQ-Receiver 的方法不止一种,但选择一种并坚持使用很重要。
如果您决定使用 Annotation Driven Listener Endpoint,我从您对 @EnableRabbit 和 @RabbitListener 的使用中得出的结果比您发布的配置对我不起作用。以下是有效的:
从 org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer 派生您的配置 Class 并覆盖方法 configureRabbitListeners 如下:
@Override
public void configureRabbitListeners(
RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}
并添加一个 MessageHandlerFactory:
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(new MappingJackson2MessageConverter());
return factory;
}
此外,您需要定义 SimpleRabbitListenerContainerFactory(就像您已经做的那样)并自动装配相应的 ConnectionFactory:
@Autowired
public ConnectionFactory connectionFactory;
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
return factory;
}
完成你的配置,你需要定义Bean,它处理你的消息并继承@RabbitListerner-Annotations。对我而言,我将其命名为 EventResultHandler(您将其命名为 TestService):
@Bean
public EventResultHandler eventResultHandler() {
return new EventResultHandler();
}
然后在您的 EventResultHandler(或 TestService)中定义 @RabbitListener-Methods 及其相应的队列和有效负载(= POJO,其中您的 JSON-Message 被序列化为):
@Component
public class EventResultHandler {
@RabbitListener(queues=Queues.QUEUE_NAME_PRESENTATION_SERVICE)
public void handleMessage(@Payload Event event) {
System.out.println("Event received");
System.out.println("EventType: " + event.getType().getText());
}
}
我省略了队列和交换所需的定义和绑定——你可以在一个或另一个微服务中完成——或者在 RabbitMQ-Server 中手动完成……但你肯定必须这样做。
根据 Spring 引导版本 2.1.4.RELEASE,您可以执行以下操作:
- 使用 Jackson 消息转换器声明 "decorated" RabbitMq 模板:
@Bean
RabbitTemplate rabbitTemplate(RabbitTemplate rabbitTemplate) {
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate
}
- 从队列中读取消息:
var receievedValie = rabbitTemplate.receiveAndConvert("TestQueue", new ParameterizedTypeReference<Integer>() {
@Override
public Type getType() {
return super.getType();
}
})