Apache QPID JMS 客户端 - 设置预取不起作用

Apache QPID JMS Client - setting prefetch doesn't work

我有一个 JmsConnectionFactory 配置了这样的 URI:

failover:(amqps://11.22.33.44?amqp.idleTimeout=120000&transport.tcpKeepAlive=true&jms.prefetchPolicy.all=10)?failover.maxReconnectAttempts=20

注意jms.prefetchPolicy.all=10参数,根据官方documentation

... controls how many messages the remote peer can send to the client and be held in a prefetch buffer for each consumer instance.

所以我应该不会看到客户端中缓冲的消息超过 10 条,对吧?好吧,那是行不通的。

我最终使用反射定期打印每个 JmsMessageConsumer:

MessageQueue.size()
MessageConsumer messageConsumer = ...
Field field = JmsMessageConsumer.class.getDeclaredField("messageQueue");
field.setAccessible(true);
MessageQueue q = (MessageQueue) field.get(messageConsumer);
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> System.out.println(q.size()), 10, 10, TimeUnit.SECONDS);

当我的消息处理程序很慢(或被阻塞)时,我看到队列大小略小于 1000 条消息,这是默认的预取大小。

那么 - 这是一个错误吗?我该如何设置不同的预取大小?

我正在使用 qpid-jms-client,版本 0.27.0

想通了。

JmsConnectionFactory 有一个 JmsPrefetchPolicy。我看不出应该如何使用 URI 参数设置它,但可以使用 JmsConnectionFactory.setPrefetchPolicy() 像这样设置它:

JmsConnectionFactory cf = ...
JmsDefaultPrefetchPolicy prefetchPolicy = new JmsDefaultPrefetchPolicy();
prefetchPolicy.setAll(123); // Set prefetch size here
cf.setPrefetchPolicy(prefetchPolicy);

还没有在任何地方看到这个记录。

你的URI不正确,应该是:

failover://(amqps://11.22.33.44?amqp.idleTimeout=120000&transport.tcpKeepAlive=true)
    ?jms.prefetchPolicy.all=10&failover.maxReconnectAttempts=20

JMS 选项是全局的,因此它们与故障转移选项一起应用于 URI 的最外层部分。包装的 AMQP 连接 URI 仅包含控制每个特定连接的传输选项。