仅从@RabbitListener 获取特定消息

Taking only specific message from @RabbitListener

我有向 RabbitMQ 发送消息的遗留系统。 该系统只使用一个 queue : q.finance.invoice 但它有两种类型的消息,其中消息类型在 header.

上可用

第一种

Type : invoice.created
{
  "field_1" : "",
  "field_2" : "",
}

第二种

Type : invoice.paid

{
  "field_5" : "",
  "field_6" : "",
}

所以现在我的消费者需要根据数据类型有选择地处理消息。 Spring 有 @RabbitHandler 可以做到这一点...如果消息是由 spring 发布的。 但是我不能使用 @RabbitHandler 注释。 我认为这是因为 @RabbitHandler 正在根据遗留系统中不存在的 __TypeId__ header 转换消息。

如何模拟这种 @RabbitHandler 行为(根据其类型获取数据)?

所以我使用@RabbitListener来消费消息。 但是 @RabbitListener 正在接收所有类型的消息。 我们使用 @RabbitListener 的另一个原因是我们的错误处理程序依赖于 MessageChannel 我们的基本方法签名是这样的:

    @RabbitListener(queues = "q.finance.invoice")
    public void listenInvoicePaid(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
   // convert message body JSON string to object
   // process it
}

我正在尝试根据类型进行手动拒绝,这很有效。但是我确定当我有很多听众或 queues

时它是不可扩展的
import java.io.IOException;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;

import com.rabbitmq.client.Channel;

@Service
public class InvoiceListenerOnMethod {

    private static final Logger log = LoggerFactory.getLogger(InvoiceListenerOnMethod.class);

    @RabbitListener(queues = "q.finance.invoice")
    public void listenInvoiceCreated(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
            throws IOException {
        if (!StringUtils.equalsIgnoreCase("invoice.created", message.getMessageProperties().getType())) {
            log.warn("[on Method] Rejecting invoice created : {}", message);
            channel.basicReject(tag, true);
            return;
        }

        log.info("[on Method] Listening invoice created : {}", message);
    }

    @RabbitListener(queues = "q.finance.invoice")
    public void listenInvoicePaid(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
            throws IOException {
        if (!StringUtils.equalsIgnoreCase("invoice.paid", message.getMessageProperties().getType())) {
            log.warn("[on Method] Rejecting invoice paid : {}", message);
            channel.basicReject(tag, true);
            return;
        }

        log.info("[on Method] Listening invoice paid : {}", message);
    }

}

看,如果我有 4 条消息时 (paid-paid-created-created),侦听器可以运行超过 4 次,因为我们无法控制谁将接收哪条消息。所以它可以像这样 listenInvoicePaid()

并且在 listenInvoiceCreated()
中也可能发生 ack() 之前的多个 rejects() 因此,在正确处理所有消息之前,我总共可以调用 10 条左右的消息。

有什么修复代码的建议吗?

我没有使用过 spring rabbit 的集成,但是 all-in-all 有一个处理不同消息类型的单一 queue 的想法听起来有点问题:

许多消费者可能会收到他们无法处理的类型的消息,并且不得不拒绝它们,以便消息返回到 rabbit,然后一次又一次......所有集群的性能会因此加重。

所以我认为您可以遵循两条路径:

  • 实现可以处理两种类型消息的单一侦听器。无需更改 Rabbit,但在 java 方面可能是一个具有挑战性的重构。

  • 幸运的是,Rabbit MQ 在路由消息方面非常灵活。配置 exchange 以根据路由键将类型 A 的消息路由到 queue A 并将类型 B 的消息路由到 queue B,header 无论如何,Rabbit 中有不同类型的 Exchange,你您肯定会找到最适合您的配置。

我个人会走第二条路。

你可以在容器工厂的afterReceiveMessagePostProcessor属性中添加一个MessagePostProcessor。在 post 处理器中,您可以检查 JSON body() 并将 __TypeId__ header 设置为适当的 class 名称。

有关示例,请参阅 this answer

可能的实现

这是天真的 if-else 方法,谢谢马克。这是您的建议(第一个选择)。 至于第二种选择,我做不到,因为发布者是遗留系统,我没有代码

    @RabbitListener(queues = "q.finance.invoice")
    public void listenInvoiceCreated(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long tag,
            @Header("type") String type) throws IOException {
        if (StringUtils.equalsIgnoreCase(type, "invoice.paid")) {
            log.info("Delegate to invoice paid handler");
        } else if (StringUtils.equalsIgnoreCase(type, "invoice.created")) {
            log.info("Delegate to invoice created handler");
        } else {
            log.info("Delegate to default handler");
        }
    }

第二个实施方案
感谢加里,这是我实现的。我认为这是更清洁的方法。接下来我只需要将消息 post 处理器提取到其他一些 class 以进行维护,这样我就不会弄乱我的 @RabbitListener

配置文件

import java.util.Optional;

import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.course.finance.message.invoice.InvoiceCreatedMessage;
import com.course.finance.message.invoice.InvoicePaidMessage;

@Configuration
public class RabbitmqConfig {

    @Bean(name = "rabbitListenerContainerFactory")
    public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);

        factory.setAfterReceivePostProcessors(new MessagePostProcessor() {

            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                var type = message.getMessageProperties().getHeaders().get("type").toString();
                String typeId = null;

                if (StringUtils.equalsIgnoreCase(type, "invoice.paid")) {
                    typeId = InvoicePaidMessage.class.getName();
                } else if (StringUtils.equalsIgnoreCase(type, "invoice.created")) {
                    typeId = InvoiceCreatedMessage.class.getName();
                }

                Optional.ofNullable(typeId).ifPresent(t -> message.getMessageProperties().setHeader("__TypeId__", t));

                return message;
            }

        });

        return factory;
    }

    @Bean
    Jackson2JsonMessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    RabbitTemplate rabbitTemplate(Jackson2JsonMessageConverter converter, ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }

}

听众

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import com.course.finance.message.invoice.InvoiceCreatedMessage;
import com.course.finance.message.invoice.InvoicePaidMessage;

@Service
@RabbitListener(queues = "q.finance.invoice")
public class InvoiceListener {

    private static final Logger log = LoggerFactory.getLogger(InvoiceListener.class);

    @RabbitHandler
    public void listenInvoiceCreated(InvoiceCreatedMessage message) {
        log.info("Listening invoice created : {}", message);
    }

    @RabbitHandler
    public void listenInvoicePaid(InvoicePaidMessage message) {
        log.info("Listening invoice paid : {}", message);
    }

    @RabbitHandler(isDefault = true)
    public void listenDefault(Message message) {
        log.info("Default invoice listener : {}", message.getMessageProperties().getHeaders());
    }

}