Spring RabbitMq 侦听器配置
Spring RabbitMq Listener Configuration
我们使用默认 spring 引导配置的 RabbitMq。我们有一个用例,其中我们不希望其中一个侦听器具有并行性。也就是说,我们只希望消费者的一个线程在任何给定时间点处于 运行。我们想要这样,因为用例的性质是我们希望按顺序使用消息,因此如果每个消费者有多个线程,则可能会乱序处理消息。
因为,我们使用的是默认值并且没有显式调整容器,所以我们使用 SimpleMessageListenerContainer
。通过查看文档,我尝试使用 concurrency = "1"
修复消费者数量。目标方法上的注释如下所示 @RabbitListener(queues = ["queue-name"], concurrency = "1")
.
根据文档,这应该确保只有消费者线程。
{@link org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
* SimpleMessageListenerContainer} if this value is a simple integer, it sets a fixed
* number of consumers in the {@code concurrentConsumers} property
2021-10-29 06:11:26.361 INFO 29752 --- [ntContainer#4-1] c.t.t.i.p.s.xxx : Created xxx
2021-10-29 06:11:26.383 INFO 29752 --- [ntContainer#0-1] c.t.t.i.p.s.xxx : Created xxx
这里要注意的ThreadId是[ntContainer#4-1]
和[ntContainer#0-1]
.
所以问题是 - 我们如何确保在任何给定时间点每个消费者只有一个线程?
编辑:添加消费者代码class以获得更多上下文
@ConditionalOnProperty(value = ["rabbitmq.sharebooking.enabled"], havingValue = "true", matchIfMissing = false)
class ShareBookingConsumer @Autowired constructor(
private val shareBookingRepository: ShareBookingRepository,
private val objectMapper: ObjectMapper,
private val shareDtoToShareBookingConverter: ShareBookingDtoToShareBookingConverter
) {
private val logger = LoggerFactory.getLogger(javaClass)
init {
logger.info("start sharebooking created consumer")
}
@RabbitListener(queues = ["tax_engine.share_booking"], concurrency = "1-1", exclusive = true)
@Timed
@Transactional
fun consumeShareBookingCreatedEvent(message: Message) {
try {
consumeShareBookingCreatedEvent(message.body)
} catch (e: Exception) {
throw AmqpRejectAndDontRequeueException(e)
}
}
private fun consumeShareBookingCreatedEvent(event: ByteArray) {
toShareBookingCreationMessageEvent(event).let { creationEvent ->
RmqMetrics.measureEventMetrics(creationEvent)
val shareBooking = shareDtoToShareBookingConverter.convert(creationEvent.data)
val persisted = shareBookingRepository.save(shareBooking)
logger.info("Created shareBooking ${creationEvent.data.id}")
}
}
private fun toShareBookingCreationMessageEvent(event: ByteArray) =
objectMapper.readValue(event, shareBookingCreateEventType)
companion object {
private val shareBookingCreateEventType =
object : TypeReference<RMQMessageEnvelope<ShareBookingCreationDto>>() {}
}
}
Edit: Adding application thread analysis using visualvm
5 threads get created for 5 listeners.
[1]: https://i.stack.imgur.com/gQINE.png
试试这个:
@RabbitListener(queues = ["queue-name"], exclusive = true)
见https://docs.spring.io/spring-amqp/docs/current/reference/html/#exclusive-consumer
设置concurrency = "1-1"
。注意Listener的并发不仅仅取决于concurrentConsumers
,还取决于maxConcurrentConsumers
:
如果您使用自定义工厂:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory cachingConnectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cachingConnectionFactory);
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
return factory;
}
有关详细信息,请参阅:https://docs.spring.io/spring-amqp/docs/current/reference/html/#simplemessagelistenercontainer。
编辑:
我做了一个简单的测试,2个消费者&2个线程:
@RabbitListener(queues = "myQueue111", concurrency = "1-1")
public void handleMessage(Object message) throws InterruptedException {
LOGGER.info("Received message : {} in {}", message, Thread.currentThread().getName());
}
@RabbitListener(queues = "myQueue222", concurrency = "1-1")
public void handleMessag1e(Object message) throws InterruptedException {
LOGGER.info("Received message222 : {} in {}", message, Thread.currentThread().getName());
}
我们使用默认 spring 引导配置的 RabbitMq。我们有一个用例,其中我们不希望其中一个侦听器具有并行性。也就是说,我们只希望消费者的一个线程在任何给定时间点处于 运行。我们想要这样,因为用例的性质是我们希望按顺序使用消息,因此如果每个消费者有多个线程,则可能会乱序处理消息。
因为,我们使用的是默认值并且没有显式调整容器,所以我们使用 SimpleMessageListenerContainer
。通过查看文档,我尝试使用 concurrency = "1"
修复消费者数量。目标方法上的注释如下所示 @RabbitListener(queues = ["queue-name"], concurrency = "1")
.
根据文档,这应该确保只有消费者线程。
{@link org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer * SimpleMessageListenerContainer} if this value is a simple integer, it sets a fixed * number of consumers in the {@code concurrentConsumers} property
2021-10-29 06:11:26.361 INFO 29752 --- [ntContainer#4-1] c.t.t.i.p.s.xxx : Created xxx
2021-10-29 06:11:26.383 INFO 29752 --- [ntContainer#0-1] c.t.t.i.p.s.xxx : Created xxx
这里要注意的ThreadId是[ntContainer#4-1]
和[ntContainer#0-1]
.
所以问题是 - 我们如何确保在任何给定时间点每个消费者只有一个线程?
编辑:添加消费者代码class以获得更多上下文
@ConditionalOnProperty(value = ["rabbitmq.sharebooking.enabled"], havingValue = "true", matchIfMissing = false)
class ShareBookingConsumer @Autowired constructor(
private val shareBookingRepository: ShareBookingRepository,
private val objectMapper: ObjectMapper,
private val shareDtoToShareBookingConverter: ShareBookingDtoToShareBookingConverter
) {
private val logger = LoggerFactory.getLogger(javaClass)
init {
logger.info("start sharebooking created consumer")
}
@RabbitListener(queues = ["tax_engine.share_booking"], concurrency = "1-1", exclusive = true)
@Timed
@Transactional
fun consumeShareBookingCreatedEvent(message: Message) {
try {
consumeShareBookingCreatedEvent(message.body)
} catch (e: Exception) {
throw AmqpRejectAndDontRequeueException(e)
}
}
private fun consumeShareBookingCreatedEvent(event: ByteArray) {
toShareBookingCreationMessageEvent(event).let { creationEvent ->
RmqMetrics.measureEventMetrics(creationEvent)
val shareBooking = shareDtoToShareBookingConverter.convert(creationEvent.data)
val persisted = shareBookingRepository.save(shareBooking)
logger.info("Created shareBooking ${creationEvent.data.id}")
}
}
private fun toShareBookingCreationMessageEvent(event: ByteArray) =
objectMapper.readValue(event, shareBookingCreateEventType)
companion object {
private val shareBookingCreateEventType =
object : TypeReference<RMQMessageEnvelope<ShareBookingCreationDto>>() {}
}
}
Edit: Adding application thread analysis using visualvm
5 threads get created for 5 listeners.
[1]: https://i.stack.imgur.com/gQINE.png
试试这个:
@RabbitListener(queues = ["queue-name"], exclusive = true)
见https://docs.spring.io/spring-amqp/docs/current/reference/html/#exclusive-consumer
设置concurrency = "1-1"
。注意Listener的并发不仅仅取决于concurrentConsumers
,还取决于maxConcurrentConsumers
:
如果您使用自定义工厂:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory cachingConnectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cachingConnectionFactory);
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
return factory;
}
有关详细信息,请参阅:https://docs.spring.io/spring-amqp/docs/current/reference/html/#simplemessagelistenercontainer。
编辑: 我做了一个简单的测试,2个消费者&2个线程:
@RabbitListener(queues = "myQueue111", concurrency = "1-1")
public void handleMessage(Object message) throws InterruptedException {
LOGGER.info("Received message : {} in {}", message, Thread.currentThread().getName());
}
@RabbitListener(queues = "myQueue222", concurrency = "1-1")
public void handleMessag1e(Object message) throws InterruptedException {
LOGGER.info("Received message222 : {} in {}", message, Thread.currentThread().getName());
}