仅从@RabbitListener 获取特定消息
Taking only specific message from @RabbitListener
我有向 RabbitMQ 发送消息的遗留系统。
该系统只使用一个 queue : q.finance.invoice
但它有两种类型的消息,其中消息类型在 header.
上可用
第一种
Type : invoice.created
{
"field_1" : "",
"field_2" : "",
}
第二种
Type : invoice.paid
{
"field_5" : "",
"field_6" : "",
}
所以现在我的消费者需要根据数据类型有选择地处理消息。
Spring 有 @RabbitHandler
可以做到这一点...如果消息是由 spring 发布的。
但是我不能使用 @RabbitHandler
注释。
我认为这是因为 @RabbitHandler
正在根据遗留系统中不存在的 __TypeId__
header 转换消息。
如何模拟这种 @RabbitHandler
行为(根据其类型获取数据)?
所以我使用@RabbitListener
来消费消息。
但是 @RabbitListener
正在接收所有类型的消息。
我们使用 @RabbitListener
的另一个原因是我们的错误处理程序依赖于 Message
和 Channel
我们的基本方法签名是这样的:
@RabbitListener(queues = "q.finance.invoice")
public void listenInvoicePaid(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
// convert message body JSON string to object
// process it
}
我正在尝试根据类型进行手动拒绝,这很有效。但是我确定当我有很多听众或 queues
时它是不可扩展的
import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import com.rabbitmq.client.Channel;
@Service
public class InvoiceListenerOnMethod {
private static final Logger log = LoggerFactory.getLogger(InvoiceListenerOnMethod.class);
@RabbitListener(queues = "q.finance.invoice")
public void listenInvoiceCreated(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
if (!StringUtils.equalsIgnoreCase("invoice.created", message.getMessageProperties().getType())) {
log.warn("[on Method] Rejecting invoice created : {}", message);
channel.basicReject(tag, true);
return;
}
log.info("[on Method] Listening invoice created : {}", message);
}
@RabbitListener(queues = "q.finance.invoice")
public void listenInvoicePaid(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
if (!StringUtils.equalsIgnoreCase("invoice.paid", message.getMessageProperties().getType())) {
log.warn("[on Method] Rejecting invoice paid : {}", message);
channel.basicReject(tag, true);
return;
}
log.info("[on Method] Listening invoice paid : {}", message);
}
}
看,如果我有 4 条消息时 (paid-paid-created-created),侦听器可以运行超过 4 次,因为我们无法控制谁将接收哪条消息。所以它可以像这样 listenInvoicePaid()
- 拒绝()
- 拒绝()
- ack()
- 拒绝()
- ack()
并且在 listenInvoiceCreated()
中也可能发生 ack() 之前的多个 rejects()
因此,在正确处理所有消息之前,我总共可以调用 10 条左右的消息。
有什么修复代码的建议吗?
我没有使用过 spring rabbit 的集成,但是 all-in-all 有一个处理不同消息类型的单一 queue 的想法听起来有点问题:
许多消费者可能会收到他们无法处理的类型的消息,并且不得不拒绝它们,以便消息返回到 rabbit,然后一次又一次......所有集群的性能会因此加重。
所以我认为您可以遵循两条路径:
实现可以处理两种类型消息的单一侦听器。无需更改 Rabbit,但在 java 方面可能是一个具有挑战性的重构。
幸运的是,Rabbit MQ 在路由消息方面非常灵活。配置 exchange 以根据路由键将类型 A 的消息路由到 queue A 并将类型 B 的消息路由到 queue B,header 无论如何,Rabbit 中有不同类型的 Exchange,你您肯定会找到最适合您的配置。
我个人会走第二条路。
你可以在容器工厂的afterReceiveMessagePostProcessor
属性中添加一个MessagePostProcessor
。在 post 处理器中,您可以检查 JSON body()
并将 __TypeId__
header 设置为适当的 class 名称。
有关示例,请参阅 this answer。
可能的实现
这是天真的 if-else 方法,谢谢马克。这是您的建议(第一个选择)。
至于第二种选择,我做不到,因为发布者是遗留系统,我没有代码
@RabbitListener(queues = "q.finance.invoice")
public void listenInvoiceCreated(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long tag,
@Header("type") String type) throws IOException {
if (StringUtils.equalsIgnoreCase(type, "invoice.paid")) {
log.info("Delegate to invoice paid handler");
} else if (StringUtils.equalsIgnoreCase(type, "invoice.created")) {
log.info("Delegate to invoice created handler");
} else {
log.info("Delegate to default handler");
}
}
第二个实施方案
感谢加里,这是我实现的。我认为这是更清洁的方法。接下来我只需要将消息 post 处理器提取到其他一些 class 以进行维护,这样我就不会弄乱我的 @RabbitListener
配置文件
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.course.finance.message.invoice.InvoiceCreatedMessage;
import com.course.finance.message.invoice.InvoicePaidMessage;
@Configuration
public class RabbitmqConfig {
@Bean(name = "rabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setAfterReceivePostProcessors(new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
var type = message.getMessageProperties().getHeaders().get("type").toString();
String typeId = null;
if (StringUtils.equalsIgnoreCase(type, "invoice.paid")) {
typeId = InvoicePaidMessage.class.getName();
} else if (StringUtils.equalsIgnoreCase(type, "invoice.created")) {
typeId = InvoiceCreatedMessage.class.getName();
}
Optional.ofNullable(typeId).ifPresent(t -> message.getMessageProperties().setHeader("__TypeId__", t));
return message;
}
});
return factory;
}
@Bean
Jackson2JsonMessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
RabbitTemplate rabbitTemplate(Jackson2JsonMessageConverter converter, ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
}
听众
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import com.course.finance.message.invoice.InvoiceCreatedMessage;
import com.course.finance.message.invoice.InvoicePaidMessage;
@Service
@RabbitListener(queues = "q.finance.invoice")
public class InvoiceListener {
private static final Logger log = LoggerFactory.getLogger(InvoiceListener.class);
@RabbitHandler
public void listenInvoiceCreated(InvoiceCreatedMessage message) {
log.info("Listening invoice created : {}", message);
}
@RabbitHandler
public void listenInvoicePaid(InvoicePaidMessage message) {
log.info("Listening invoice paid : {}", message);
}
@RabbitHandler(isDefault = true)
public void listenDefault(Message message) {
log.info("Default invoice listener : {}", message.getMessageProperties().getHeaders());
}
}
我有向 RabbitMQ 发送消息的遗留系统。
该系统只使用一个 queue : q.finance.invoice
但它有两种类型的消息,其中消息类型在 header.
第一种
Type : invoice.created
{
"field_1" : "",
"field_2" : "",
}
第二种
Type : invoice.paid
{
"field_5" : "",
"field_6" : "",
}
所以现在我的消费者需要根据数据类型有选择地处理消息。
Spring 有 @RabbitHandler
可以做到这一点...如果消息是由 spring 发布的。
但是我不能使用 @RabbitHandler
注释。
我认为这是因为 @RabbitHandler
正在根据遗留系统中不存在的 __TypeId__
header 转换消息。
如何模拟这种 @RabbitHandler
行为(根据其类型获取数据)?
所以我使用@RabbitListener
来消费消息。
但是 @RabbitListener
正在接收所有类型的消息。
我们使用 @RabbitListener
的另一个原因是我们的错误处理程序依赖于 Message
和 Channel
我们的基本方法签名是这样的:
@RabbitListener(queues = "q.finance.invoice")
public void listenInvoicePaid(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
// convert message body JSON string to object
// process it
}
我正在尝试根据类型进行手动拒绝,这很有效。但是我确定当我有很多听众或 queues
时它是不可扩展的import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import com.rabbitmq.client.Channel;
@Service
public class InvoiceListenerOnMethod {
private static final Logger log = LoggerFactory.getLogger(InvoiceListenerOnMethod.class);
@RabbitListener(queues = "q.finance.invoice")
public void listenInvoiceCreated(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
if (!StringUtils.equalsIgnoreCase("invoice.created", message.getMessageProperties().getType())) {
log.warn("[on Method] Rejecting invoice created : {}", message);
channel.basicReject(tag, true);
return;
}
log.info("[on Method] Listening invoice created : {}", message);
}
@RabbitListener(queues = "q.finance.invoice")
public void listenInvoicePaid(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
if (!StringUtils.equalsIgnoreCase("invoice.paid", message.getMessageProperties().getType())) {
log.warn("[on Method] Rejecting invoice paid : {}", message);
channel.basicReject(tag, true);
return;
}
log.info("[on Method] Listening invoice paid : {}", message);
}
}
看,如果我有 4 条消息时 (paid-paid-created-created),侦听器可以运行超过 4 次,因为我们无法控制谁将接收哪条消息。所以它可以像这样 listenInvoicePaid()
- 拒绝()
- 拒绝()
- ack()
- 拒绝()
- ack()
并且在 listenInvoiceCreated()
中也可能发生 ack() 之前的多个 rejects()
因此,在正确处理所有消息之前,我总共可以调用 10 条左右的消息。
有什么修复代码的建议吗?
我没有使用过 spring rabbit 的集成,但是 all-in-all 有一个处理不同消息类型的单一 queue 的想法听起来有点问题:
许多消费者可能会收到他们无法处理的类型的消息,并且不得不拒绝它们,以便消息返回到 rabbit,然后一次又一次......所有集群的性能会因此加重。
所以我认为您可以遵循两条路径:
实现可以处理两种类型消息的单一侦听器。无需更改 Rabbit,但在 java 方面可能是一个具有挑战性的重构。
幸运的是,Rabbit MQ 在路由消息方面非常灵活。配置 exchange 以根据路由键将类型 A 的消息路由到 queue A 并将类型 B 的消息路由到 queue B,header 无论如何,Rabbit 中有不同类型的 Exchange,你您肯定会找到最适合您的配置。
我个人会走第二条路。
你可以在容器工厂的afterReceiveMessagePostProcessor
属性中添加一个MessagePostProcessor
。在 post 处理器中,您可以检查 JSON body()
并将 __TypeId__
header 设置为适当的 class 名称。
有关示例,请参阅 this answer。
可能的实现
这是天真的 if-else 方法,谢谢马克。这是您的建议(第一个选择)。 至于第二种选择,我做不到,因为发布者是遗留系统,我没有代码
@RabbitListener(queues = "q.finance.invoice")
public void listenInvoiceCreated(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long tag,
@Header("type") String type) throws IOException {
if (StringUtils.equalsIgnoreCase(type, "invoice.paid")) {
log.info("Delegate to invoice paid handler");
} else if (StringUtils.equalsIgnoreCase(type, "invoice.created")) {
log.info("Delegate to invoice created handler");
} else {
log.info("Delegate to default handler");
}
}
第二个实施方案
感谢加里,这是我实现的。我认为这是更清洁的方法。接下来我只需要将消息 post 处理器提取到其他一些 class 以进行维护,这样我就不会弄乱我的 @RabbitListener
配置文件
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.course.finance.message.invoice.InvoiceCreatedMessage;
import com.course.finance.message.invoice.InvoicePaidMessage;
@Configuration
public class RabbitmqConfig {
@Bean(name = "rabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setAfterReceivePostProcessors(new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
var type = message.getMessageProperties().getHeaders().get("type").toString();
String typeId = null;
if (StringUtils.equalsIgnoreCase(type, "invoice.paid")) {
typeId = InvoicePaidMessage.class.getName();
} else if (StringUtils.equalsIgnoreCase(type, "invoice.created")) {
typeId = InvoiceCreatedMessage.class.getName();
}
Optional.ofNullable(typeId).ifPresent(t -> message.getMessageProperties().setHeader("__TypeId__", t));
return message;
}
});
return factory;
}
@Bean
Jackson2JsonMessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
RabbitTemplate rabbitTemplate(Jackson2JsonMessageConverter converter, ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
}
听众
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import com.course.finance.message.invoice.InvoiceCreatedMessage;
import com.course.finance.message.invoice.InvoicePaidMessage;
@Service
@RabbitListener(queues = "q.finance.invoice")
public class InvoiceListener {
private static final Logger log = LoggerFactory.getLogger(InvoiceListener.class);
@RabbitHandler
public void listenInvoiceCreated(InvoiceCreatedMessage message) {
log.info("Listening invoice created : {}", message);
}
@RabbitHandler
public void listenInvoicePaid(InvoicePaidMessage message) {
log.info("Listening invoice paid : {}", message);
}
@RabbitHandler(isDefault = true)
public void listenDefault(Message message) {
log.info("Default invoice listener : {}", message.getMessageProperties().getHeaders());
}
}