Apache QPID JMS 客户端 - 设置预取不起作用
Apache QPID JMS Client - setting prefetch doesn't work
我有一个 JmsConnectionFactory
配置了这样的 URI:
failover:(amqps://11.22.33.44?amqp.idleTimeout=120000&transport.tcpKeepAlive=true&jms.prefetchPolicy.all=10)?failover.maxReconnectAttempts=20
注意jms.prefetchPolicy.all=10
参数,根据官方documentation
... controls how many messages the remote peer can
send to the client and be held in a prefetch buffer for each consumer
instance.
所以我应该不会看到客户端中缓冲的消息超过 10 条,对吧?好吧,那是行不通的。
我最终使用反射定期打印每个 JmsMessageConsumer
:
的 MessageQueue.size()
MessageConsumer messageConsumer = ...
Field field = JmsMessageConsumer.class.getDeclaredField("messageQueue");
field.setAccessible(true);
MessageQueue q = (MessageQueue) field.get(messageConsumer);
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> System.out.println(q.size()), 10, 10, TimeUnit.SECONDS);
当我的消息处理程序很慢(或被阻塞)时,我看到队列大小略小于 1000 条消息,这是默认的预取大小。
那么 - 这是一个错误吗?我该如何设置不同的预取大小?
我正在使用 qpid-jms-client
,版本 0.27.0
。
想通了。
JmsConnectionFactory
有一个 JmsPrefetchPolicy
。我看不出应该如何使用 URI 参数设置它,但可以使用 JmsConnectionFactory.setPrefetchPolicy()
像这样设置它:
JmsConnectionFactory cf = ...
JmsDefaultPrefetchPolicy prefetchPolicy = new JmsDefaultPrefetchPolicy();
prefetchPolicy.setAll(123); // Set prefetch size here
cf.setPrefetchPolicy(prefetchPolicy);
还没有在任何地方看到这个记录。
你的URI不正确,应该是:
failover://(amqps://11.22.33.44?amqp.idleTimeout=120000&transport.tcpKeepAlive=true)
?jms.prefetchPolicy.all=10&failover.maxReconnectAttempts=20
JMS 选项是全局的,因此它们与故障转移选项一起应用于 URI 的最外层部分。包装的 AMQP 连接 URI 仅包含控制每个特定连接的传输选项。
我有一个 JmsConnectionFactory
配置了这样的 URI:
failover:(amqps://11.22.33.44?amqp.idleTimeout=120000&transport.tcpKeepAlive=true&jms.prefetchPolicy.all=10)?failover.maxReconnectAttempts=20
注意jms.prefetchPolicy.all=10
参数,根据官方documentation
... controls how many messages the remote peer can send to the client and be held in a prefetch buffer for each consumer instance.
所以我应该不会看到客户端中缓冲的消息超过 10 条,对吧?好吧,那是行不通的。
我最终使用反射定期打印每个 JmsMessageConsumer
:
MessageQueue.size()
MessageConsumer messageConsumer = ...
Field field = JmsMessageConsumer.class.getDeclaredField("messageQueue");
field.setAccessible(true);
MessageQueue q = (MessageQueue) field.get(messageConsumer);
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> System.out.println(q.size()), 10, 10, TimeUnit.SECONDS);
当我的消息处理程序很慢(或被阻塞)时,我看到队列大小略小于 1000 条消息,这是默认的预取大小。
那么 - 这是一个错误吗?我该如何设置不同的预取大小?
我正在使用 qpid-jms-client
,版本 0.27.0
。
想通了。
JmsConnectionFactory
有一个 JmsPrefetchPolicy
。我看不出应该如何使用 URI 参数设置它,但可以使用 JmsConnectionFactory.setPrefetchPolicy()
像这样设置它:
JmsConnectionFactory cf = ...
JmsDefaultPrefetchPolicy prefetchPolicy = new JmsDefaultPrefetchPolicy();
prefetchPolicy.setAll(123); // Set prefetch size here
cf.setPrefetchPolicy(prefetchPolicy);
还没有在任何地方看到这个记录。
你的URI不正确,应该是:
failover://(amqps://11.22.33.44?amqp.idleTimeout=120000&transport.tcpKeepAlive=true)
?jms.prefetchPolicy.all=10&failover.maxReconnectAttempts=20
JMS 选项是全局的,因此它们与故障转移选项一起应用于 URI 的最外层部分。包装的 AMQP 连接 URI 仅包含控制每个特定连接的传输选项。