SpringBoot:手动确认多个 AMQP 消息
SpringBoot: Manually ack multiple AMQP messages
我需要在兔子侦听器中手动确认多条消息只有在它们被成功处理和存储后。 Spring使用的启动配置如下
listener:
concurrency: 2
max-concurrency: 20
acknowledge-mode: manual
prefetch: 30
消息应一次存储 20 条。只有当它们被成功存储时,才应该发送多重确认。还有与存储机制相关的超时,即使没有 20 条消息,它也应该在 20 秒后存储消息。目前,我有以下代码
@Slf4j
@Component
class EventListener {
@Autowired
private EventsStorage eventsStorage
private ConcurrentMap<Integer, ChannelData> channelEvents = new ConcurrentHashMap<>()
@RabbitListener(queues = 'event-queue')
void processEvent(@Payload Event event, Channel channel, @Header(DELIVERY_TAG) long tag) {
log.debug("Event received for channel $channel.channelNumber")
channelEvents.compute(channel.channelNumber, { k, channelData -> addEventAndStoreIfNeeded(channel, event, tag, channelData) })
}
private ChannelData addEventAndStoreIfNeeded(Channel channel, Event event, long tag, ChannelData channelData) {
if (channelData) {
channelData.addEvent(tag, event)
if (channelData.getDeliveredEvents().size() >= batchSize) {
storeAndAckChannelEvents(channel.channelNumber)
}
return channelData
} else {
ChannelData newChannelData = new ChannelData(channel)
newChannelData.addEvent(tag, event)
return newChannelData
}
}
void storeAndAckChannelEvents(Integer channelNumber) {
channelEvents.compute(channelNumber, { k, channelData ->
List<DeliveredEvent> deliveredEvents = channelData.deliveredEvents
if (!deliveredEvents.isEmpty()) {
def events = deliveredEvents.stream()
.map({ DeliveredEvent deliveredEvent -> deliveredEvent.event })
.collect(Collectors.toList())
eventsStorage.store(events)
long lastDeliveryTag = deliveredEvents.get(deliveredEvents.size() - 1).deliveryTag
channelData.channel.basicAck(lastDeliveryTag, true)
deliveredEvents.clear()
}
})
}
@Scheduled(fixedRate = 20000L)
void storeMessagingEvents() {
channelEvents.forEach({ k, channelData -> storeAndAckChannelEvents(channelData) })
}
}
其中ChannelData
和DeliveredEvent
如下
class DeliveredMesssagingEvent {
int deliveryTag
Event event
}
class ChannelData {
Channel channel
List<DeliveredEvent> deliveredEvents = new ArrayList<>()
ChannelData(Channel channel) {
this.channel = channel
}
void addEvent(long tag, Event event) {
deliveredEvents.add(new DeliveredEvent(deliveryTag: tag, event: event))
}
}
使用的Channel
是com.rabbitmq.client.Channel
。 The docs关于这个界面状态:
Channel instances must not be shared between threads. Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads.
所以,我做的恰恰相反,在 Scheduler
和 SimpleMessageListenerContainer
工作线程之间共享 Channel。我的应用程序的输出是这样的:
[SimpleAsyncTaskExecutor-3] DEBUG EventListener - Event received for channel 2
[SimpleAsyncTaskExecutor-4] DEBUG EventListener - Event received for channel 3
[SimpleAsyncTaskExecutor-5] DEBUG EventListener - Event received for channel 1
[SimpleAsyncTaskExecutor-1] DEBUG EventListener - Event received for channel 5
[SimpleAsyncTaskExecutor-2] DEBUG EventListener - Event received for channel 4
[SimpleAsyncTaskExecutor-3] DEBUG EventListener - Event received for channel 2
[SimpleAsyncTaskExecutor-1] DEBUG EventListener - Event received for channel 5
[SimpleAsyncTaskExecutor-2] DEBUG EventListener - Event received for channel 4
[SimpleAsyncTaskExecutor-3] DEBUG EventListener - Event received for channel 2
[pool-4-thread-1] DEBUG EventListener - Storing channel 5 events
[pool-4-thread-1] DEBUG EventListener - Storing channel 2 events
...
SimpleMessageListenerContainer
worker-threads 有自己的通道,不会随时间改变。
考虑到我同步了 Scheduler
和 SimpleMessageListenerContainer
工作线程,有人看到这段代码不是线程安全的原因吗?
我应该尝试在 Spring 引导中手动确认多条消息的其他方法吗?
只要同步线程就可以了。
但请记住,如果连接丢失,您将获得一个新的消费者并且您的同步线程将拥有陈旧的数据(未确认的消息将被重新传送)。
不过,您也可以使用 container idle events。
当消费者线程空闲一段时间后,事件将发布在同一个侦听器线程上,因此您可以在那里进行定时确认,而不必担心同步问题。
如果连接丢失,您还可以检测consumer failed events。
我需要在兔子侦听器中手动确认多条消息只有在它们被成功处理和存储后。 Spring使用的启动配置如下
listener:
concurrency: 2
max-concurrency: 20
acknowledge-mode: manual
prefetch: 30
消息应一次存储 20 条。只有当它们被成功存储时,才应该发送多重确认。还有与存储机制相关的超时,即使没有 20 条消息,它也应该在 20 秒后存储消息。目前,我有以下代码
@Slf4j
@Component
class EventListener {
@Autowired
private EventsStorage eventsStorage
private ConcurrentMap<Integer, ChannelData> channelEvents = new ConcurrentHashMap<>()
@RabbitListener(queues = 'event-queue')
void processEvent(@Payload Event event, Channel channel, @Header(DELIVERY_TAG) long tag) {
log.debug("Event received for channel $channel.channelNumber")
channelEvents.compute(channel.channelNumber, { k, channelData -> addEventAndStoreIfNeeded(channel, event, tag, channelData) })
}
private ChannelData addEventAndStoreIfNeeded(Channel channel, Event event, long tag, ChannelData channelData) {
if (channelData) {
channelData.addEvent(tag, event)
if (channelData.getDeliveredEvents().size() >= batchSize) {
storeAndAckChannelEvents(channel.channelNumber)
}
return channelData
} else {
ChannelData newChannelData = new ChannelData(channel)
newChannelData.addEvent(tag, event)
return newChannelData
}
}
void storeAndAckChannelEvents(Integer channelNumber) {
channelEvents.compute(channelNumber, { k, channelData ->
List<DeliveredEvent> deliveredEvents = channelData.deliveredEvents
if (!deliveredEvents.isEmpty()) {
def events = deliveredEvents.stream()
.map({ DeliveredEvent deliveredEvent -> deliveredEvent.event })
.collect(Collectors.toList())
eventsStorage.store(events)
long lastDeliveryTag = deliveredEvents.get(deliveredEvents.size() - 1).deliveryTag
channelData.channel.basicAck(lastDeliveryTag, true)
deliveredEvents.clear()
}
})
}
@Scheduled(fixedRate = 20000L)
void storeMessagingEvents() {
channelEvents.forEach({ k, channelData -> storeAndAckChannelEvents(channelData) })
}
}
其中ChannelData
和DeliveredEvent
如下
class DeliveredMesssagingEvent {
int deliveryTag
Event event
}
class ChannelData {
Channel channel
List<DeliveredEvent> deliveredEvents = new ArrayList<>()
ChannelData(Channel channel) {
this.channel = channel
}
void addEvent(long tag, Event event) {
deliveredEvents.add(new DeliveredEvent(deliveryTag: tag, event: event))
}
}
使用的Channel
是com.rabbitmq.client.Channel
。 The docs关于这个界面状态:
Channel instances must not be shared between threads. Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads.
所以,我做的恰恰相反,在 Scheduler
和 SimpleMessageListenerContainer
工作线程之间共享 Channel。我的应用程序的输出是这样的:
[SimpleAsyncTaskExecutor-3] DEBUG EventListener - Event received for channel 2
[SimpleAsyncTaskExecutor-4] DEBUG EventListener - Event received for channel 3
[SimpleAsyncTaskExecutor-5] DEBUG EventListener - Event received for channel 1
[SimpleAsyncTaskExecutor-1] DEBUG EventListener - Event received for channel 5
[SimpleAsyncTaskExecutor-2] DEBUG EventListener - Event received for channel 4
[SimpleAsyncTaskExecutor-3] DEBUG EventListener - Event received for channel 2
[SimpleAsyncTaskExecutor-1] DEBUG EventListener - Event received for channel 5
[SimpleAsyncTaskExecutor-2] DEBUG EventListener - Event received for channel 4
[SimpleAsyncTaskExecutor-3] DEBUG EventListener - Event received for channel 2
[pool-4-thread-1] DEBUG EventListener - Storing channel 5 events
[pool-4-thread-1] DEBUG EventListener - Storing channel 2 events
...
SimpleMessageListenerContainer
worker-threads 有自己的通道,不会随时间改变。
考虑到我同步了 Scheduler
和 SimpleMessageListenerContainer
工作线程,有人看到这段代码不是线程安全的原因吗?
我应该尝试在 Spring 引导中手动确认多条消息的其他方法吗?
只要同步线程就可以了。
但请记住,如果连接丢失,您将获得一个新的消费者并且您的同步线程将拥有陈旧的数据(未确认的消息将被重新传送)。
不过,您也可以使用 container idle events。
当消费者线程空闲一段时间后,事件将发布在同一个侦听器线程上,因此您可以在那里进行定时确认,而不必担心同步问题。
如果连接丢失,您还可以检测consumer failed events。