我可以在 @RabbitConsumer 中查明是否为该消费者预取了任何消息吗
Can I in a @RabbitConsumer find out if any messages are prefetched for this consumer
我需要知道是否有更多消息发送给该消费者。
现在我正在计算队列中的消息。但这只给我队列中剩余的内容,而不是预取的内容。
@RabbitListener(queues = QUEUENAME)
public void recieve(Message message, Channel channel) throws IOException {
long messagesOnQueue = channel.messageCount(QUEUENAME);
if(messagesOnQueue>1) {
//add message to list
}
else {
//save the list
}
}
如果有一种方法可以判断是否为该消费者预取了消息,那就太好了。那可能吗?如果我能得到那个计数,那么我不在乎队列中是否也有消息。
在收到 Gary 的建议后,我将实施更改为此,并且有效。
手动确认消息时,必须在收到消息的同一频道上完成。但是您可以保存对它的引用,以防您在另一个线程中需要它。
在你的 spring 引导 application.yml 添加这个
spring:
rabbitmq:
listener:
direct:
prefetch: 200
simple:
prefetch: 200
acknowledgeMode: MANUAL
来自消费者的代码。
//The list we build and save in one transaction
private Set<PayloadDto> unhandledPayloads = new HashSet<>();
private long latestTag = 0L;
private Channel latestChannel;
@RabbitListener(queues = QUEUE_NAME, id = "consumerId")
public void recieve(Message message, Channel channel) throws IOException {
PayloadDto payloadDto = parse(message.getBody());
unhandledPayloads.add(payloadDto);
latestTag = message.getMessageProperties().getDeliveryTag();
latestChannel = channel;
if (unhandledPayloads.size() > UNHANDLED_PAYLOADS_LIMIT) {
service.createOrUpdate(unhandledPayloads);
queue.clear();
channel.basicAck(latestTag, true);
}
}
@EventListener(condition = "event.listenerId == 'consumerId'")
public void onApplicationEvent(ListenerContainerIdleEvent event) {
if(!queue.isEmpty()) {
service.createOrUpdate(unhandledPayloads);
queue.clear();
latestChannel.basicAck(latestTag, true);
}
}
我们尝试在保存之前建立列表的原因是能够进行批量插入以使其 运行 更快。
目前没有,但添加功能并不难。打开一个 github 问题来请求它。但是,我不确定它会有多大用处。如果队列中仍有消息,则使用预取将获取另一个。
我需要知道是否有更多消息发送给该消费者。
现在我正在计算队列中的消息。但这只给我队列中剩余的内容,而不是预取的内容。
@RabbitListener(queues = QUEUENAME)
public void recieve(Message message, Channel channel) throws IOException {
long messagesOnQueue = channel.messageCount(QUEUENAME);
if(messagesOnQueue>1) {
//add message to list
}
else {
//save the list
}
}
如果有一种方法可以判断是否为该消费者预取了消息,那就太好了。那可能吗?如果我能得到那个计数,那么我不在乎队列中是否也有消息。
在收到 Gary 的建议后,我将实施更改为此,并且有效。 手动确认消息时,必须在收到消息的同一频道上完成。但是您可以保存对它的引用,以防您在另一个线程中需要它。 在你的 spring 引导 application.yml 添加这个
spring:
rabbitmq:
listener:
direct:
prefetch: 200
simple:
prefetch: 200
acknowledgeMode: MANUAL
来自消费者的代码。
//The list we build and save in one transaction
private Set<PayloadDto> unhandledPayloads = new HashSet<>();
private long latestTag = 0L;
private Channel latestChannel;
@RabbitListener(queues = QUEUE_NAME, id = "consumerId")
public void recieve(Message message, Channel channel) throws IOException {
PayloadDto payloadDto = parse(message.getBody());
unhandledPayloads.add(payloadDto);
latestTag = message.getMessageProperties().getDeliveryTag();
latestChannel = channel;
if (unhandledPayloads.size() > UNHANDLED_PAYLOADS_LIMIT) {
service.createOrUpdate(unhandledPayloads);
queue.clear();
channel.basicAck(latestTag, true);
}
}
@EventListener(condition = "event.listenerId == 'consumerId'")
public void onApplicationEvent(ListenerContainerIdleEvent event) {
if(!queue.isEmpty()) {
service.createOrUpdate(unhandledPayloads);
queue.clear();
latestChannel.basicAck(latestTag, true);
}
}
我们尝试在保存之前建立列表的原因是能够进行批量插入以使其 运行 更快。
目前没有,但添加功能并不难。打开一个 github 问题来请求它。但是,我不确定它会有多大用处。如果队列中仍有消息,则使用预取将获取另一个。