Spring RabbitMq 侦听器配置

Spring RabbitMq Listener Configuration

我们使用默认 spring 引导配置的 RabbitMq。我们有一个用例,其中我们不希望其中一个侦听器具有并行性。也就是说,我们只希望消费者的一个线程在任何给定时间点处于 运行。我们想要这样,因为用例的性质是我们希望按顺序使用消息,因此如果每个消费者有多个线程,则可能会乱序处理消息。 因为,我们使用的是默认值并且没有显式调整容器,所以我们使用 SimpleMessageListenerContainer。通过查看文档,我尝试使用 concurrency = "1" 修复消费者数量。目标方法上的注释如下所示 @RabbitListener(queues = ["queue-name"], concurrency = "1").

根据文档,这应该确保只有消费者线程。

{@link org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer * SimpleMessageListenerContainer} if this value is a simple integer, it sets a fixed * number of consumers in the {@code concurrentConsumers} property

2021-10-29 06:11:26.361 INFO  29752 --- [ntContainer#4-1] c.t.t.i.p.s.xxx         : Created xxx
2021-10-29 06:11:26.383 INFO  29752 --- [ntContainer#0-1] c.t.t.i.p.s.xxx         : Created xxx

这里要注意的ThreadId是[ntContainer#4-1][ntContainer#0-1].

所以问题是 - 我们如何确保在任何给定时间点每个消费者只有一个线程?

编辑:添加消费者代码class以获得更多上下文

@ConditionalOnProperty(value = ["rabbitmq.sharebooking.enabled"], havingValue = "true", matchIfMissing = false)
class ShareBookingConsumer @Autowired constructor(
    private val shareBookingRepository: ShareBookingRepository,
    private val objectMapper: ObjectMapper,
    private val shareDtoToShareBookingConverter: ShareBookingDtoToShareBookingConverter
) {
    private val logger = LoggerFactory.getLogger(javaClass)

    init {
        logger.info("start sharebooking created consumer")
    }

    @RabbitListener(queues = ["tax_engine.share_booking"], concurrency = "1-1", exclusive = true)
    @Timed
    @Transactional
    fun consumeShareBookingCreatedEvent(message: Message) {
        try {
            consumeShareBookingCreatedEvent(message.body)
        } catch (e: Exception) {
            throw AmqpRejectAndDontRequeueException(e)
        }
    }

    private fun consumeShareBookingCreatedEvent(event: ByteArray) {
        toShareBookingCreationMessageEvent(event).let { creationEvent ->
            RmqMetrics.measureEventMetrics(creationEvent)
            val shareBooking = shareDtoToShareBookingConverter.convert(creationEvent.data)
            val persisted = shareBookingRepository.save(shareBooking)
            logger.info("Created shareBooking ${creationEvent.data.id}")
        }
    }

    private fun toShareBookingCreationMessageEvent(event: ByteArray) =
        objectMapper.readValue(event, shareBookingCreateEventType)

    companion object {
        private val shareBookingCreateEventType =
            object : TypeReference<RMQMessageEnvelope<ShareBookingCreationDto>>() {}
    }
}

Edit: Adding application thread analysis using visualvm
5 threads get created for 5 listeners.

  [1]: https://i.stack.imgur.com/gQINE.png

试试这个:

@RabbitListener(queues = ["queue-name"], exclusive = true)

https://docs.spring.io/spring-amqp/docs/current/reference/html/#exclusive-consumer

设置concurrency = "1-1"。注意Listener的并发不仅仅取决于concurrentConsumers,还取决于maxConcurrentConsumers:

如果您使用自定义工厂:

  @Bean
  public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory cachingConnectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(cachingConnectionFactory);
    factory.setConcurrentConsumers(1);
    factory.setMaxConcurrentConsumers(1);
    return factory;
  }

有关详细信息,请参阅:https://docs.spring.io/spring-amqp/docs/current/reference/html/#simplemessagelistenercontainer

编辑: 我做了一个简单的测试,2个消费者&2个线程:

 @RabbitListener(queues = "myQueue111", concurrency = "1-1")
  public void handleMessage(Object message) throws InterruptedException {
    LOGGER.info("Received message : {} in {}", message, Thread.currentThread().getName());
  }

  @RabbitListener(queues = "myQueue222", concurrency = "1-1")
  public void handleMessag1e(Object message) throws InterruptedException {
    LOGGER.info("Received message222 : {} in {}", message, Thread.currentThread().getName());
  }