ActiveMQ:调度队列包含的消息多于预取大小
ActiveMQ: Dispatched queue contains more messages then prefetch size
我将预取大小设置为 1(jms.prefetchPolicy.all=1 in url)。在 Web 控制台中,我可以看到所有消费者的预取都是 1。一位消费者卡住了,他的调度队列中有 67 条消息 - 请参阅我的 screenshot
你能帮我理解这是怎么发生的吗?我已经阅读了很多这方面的文章,我的理解是调度队列大小应该达到预取大小?!
我使用以下配置来使用队列中的消息:
ConnectionFactory getActiveMQConnectionFactory() {
// Configure the ActiveMQConnectionFactory
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(brokerUrl);
activeMQConnectionFactory.setUserName(user);
activeMQConnectionFactory.setPassword(password);
activeMQConnectionFactory.setNonBlockingRedelivery(true);
// Configure the redeliver policy and the dead letter queue
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(initialRedeliveryDelay);
redeliveryPolicy.setRedeliveryDelay(redeliveryDelay);
redeliveryPolicy.setUseExponentialBackOff(useExponentialBackOff);
redeliveryPolicy.setMaximumRedeliveries(maximumRedeliveries);
RedeliveryPolicyMap redeliveryPolicyMap = activeMQConnectionFactory.getRedeliveryPolicyMap();
redeliveryPolicyMap.put(new ActiveMQQueue(thumbnailQueue), redeliveryPolicy);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
return activeMQConnectionFactory;
}
public IntegrationFlow createThumbnailFlow(String concurrency, CreateThumbnailReceiver receiver) {
return IntegrationFlows.from(
Jms.messageDrivenChannelAdapter(
Jms.container(getActiveMQConnectionFactory(), thumbnailQueue)
.concurrency(concurrency)
.sessionTransacted(true)
.get()
))
.transform(new JsonToObjectTransformer(CreateThumbnailRequest.class, jsonObjectMapper()))
.handle(receiver)
.get();
}
问题是由于 broker (5.14.5) 和 client (5.15.3) 版本不同造成的。升级代理后,分派队列按预期最多包含 2 条消息。
我将预取大小设置为 1(jms.prefetchPolicy.all=1 in url)。在 Web 控制台中,我可以看到所有消费者的预取都是 1。一位消费者卡住了,他的调度队列中有 67 条消息 - 请参阅我的 screenshot
你能帮我理解这是怎么发生的吗?我已经阅读了很多这方面的文章,我的理解是调度队列大小应该达到预取大小?!
我使用以下配置来使用队列中的消息:
ConnectionFactory getActiveMQConnectionFactory() {
// Configure the ActiveMQConnectionFactory
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(brokerUrl);
activeMQConnectionFactory.setUserName(user);
activeMQConnectionFactory.setPassword(password);
activeMQConnectionFactory.setNonBlockingRedelivery(true);
// Configure the redeliver policy and the dead letter queue
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(initialRedeliveryDelay);
redeliveryPolicy.setRedeliveryDelay(redeliveryDelay);
redeliveryPolicy.setUseExponentialBackOff(useExponentialBackOff);
redeliveryPolicy.setMaximumRedeliveries(maximumRedeliveries);
RedeliveryPolicyMap redeliveryPolicyMap = activeMQConnectionFactory.getRedeliveryPolicyMap();
redeliveryPolicyMap.put(new ActiveMQQueue(thumbnailQueue), redeliveryPolicy);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
return activeMQConnectionFactory;
}
public IntegrationFlow createThumbnailFlow(String concurrency, CreateThumbnailReceiver receiver) {
return IntegrationFlows.from(
Jms.messageDrivenChannelAdapter(
Jms.container(getActiveMQConnectionFactory(), thumbnailQueue)
.concurrency(concurrency)
.sessionTransacted(true)
.get()
))
.transform(new JsonToObjectTransformer(CreateThumbnailRequest.class, jsonObjectMapper()))
.handle(receiver)
.get();
}
问题是由于 broker (5.14.5) 和 client (5.15.3) 版本不同造成的。升级代理后,分派队列按预期最多包含 2 条消息。