使用 Artemis、JBoss EAP 7.1 和 last-value-queue
Using Artemis, JBoss EAP 7.1, and last-value-queue
我们在 JBoss EAP 7.1 和 Spring 4.3.10 上使用 JMS 队列。
我们要将特定队列设置为 "last-value-queue",详见 here。
如果我们注释掉消费者配置,即队列没有附加消费者,它将作为最后一个值队列工作,因此具有相同 _AMQ_LVQ_NAME
值的传入消息将取代预先存在的消息,并且消息不会累积。
如果我们保留消费者配置并使用 Thread.sleep()
模拟一个较长的 运行 进程,以便不立即消费除第一个消息之外的消息,具有相同 _AMQ_LVQ_NAME
值的消息将不会'丢弃,并累加到队列中。
你有什么线索吗?
我们在standalone.xml
中的JBoss配置:
<?xml version="1.0" encoding="UTF-8"?>
<subsystem xmlns="urn:jboss:domain:messaging-activemq:2.0">
<server name="default">
...
<security-setting name="#">
<role name="guest" send="true" consume="true" create-non-durable-queue="true" delete-non-durable-queue="true" />
</security-setting>
...
<address-setting name="jms.queue.NgedeRequestQueue" last-value-queue="true" address-full-policy="BLOCK" />
<address-setting name="#" dead-letter-address="jms.queue.DLQ" expiry-address="jms.queue.ExpiryQueue" max-size-bytes="10485760" page-size-bytes="2097152" message-counter-history-day-limit="10" redistribution-delay="1000" />
<jms-queue name="ExpiryQueue" entries="java:/jms/queue/ExpiryQueue" />
<jms-queue name="DLQ" entries="java:/jms/queue/DLQ" />
<jms-queue name="NgedeRequestQueue" entries="java:/jms/queue/NgedeRequestQueue" durable="true" />
...
<connection-factory name="InVmConnectionFactory" entries="java:/ConnectionFactory" connectors="in-vm" />
<connection-factory name="RemoteConnectionFactory" entries="java:jboss/exported/jms/RemoteConnectionFactory" connectors="http-connector" ha="true" block-on-acknowledge="true" reconnect-attempts="-1" />
<connection-factory name="NgedeConnectionFactory" entries="java:jboss/exported/jms/NgedeConnectionFactory" connectors="http-connector" />
<pooled-connection-factory name="activemq-ra" entries="java:/JmsXA java:jboss/DefaultJMSConnectionFactory" connectors="in-vm" transaction="xa" />
</server>
</subsystem>
我们的Spring消费者配置:
<!-- Listener Definition -->
<bean id="messageListenerRequest" class="it.eng.ngede.jms.consumer.NationalApplicationRequestConsumer"/>
<bean id="NgedeRequestQueue" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName" value="java:/jms/queue/NgedeRequestQueue" />
<property name="jndiTemplate" ref="jnditemplate" />
</bean>
<bean id="connectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName" value="java:/ConnectionFactory" />
<property name="jndiTemplate" ref="jnditemplate" />
</bean>
<bean id="jnditemplate" class="org.springframework.jndi.JndiTemplate">
<property name="environment">
<props>
<prop key="java.naming.factory.initial">org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory</prop>
</props>
</property>
</bean>
<!-- JmsContainer Definition -->
<bean id="jmsContainerRequest" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="NgedeRequestQueue" />
<property name="messageListener" ref="messageListenerRequest" />
<property name="concurrentConsumers" value="1" />
</bean>
您看到了预期的行为。如果连接了消费者,则默认情况下,由于消息缓冲,发送给代理的消息将立即发送给消费者。发送给消费者的消息被视为 "in delivery"(即它们已发送给客户端但尚未得到确认)。 "in delivery" 的消息不能被 "last value" 替换,因为此时它们基本上不受经纪人的控制。
如果您想要不同的行为,那么我建议您在连接 URL 上设置 consumerWindowSize=0
,这样消费者就不会缓冲消息。当然,这可能会降低消息吞吐量,但这对您的用例来说可能并不重要。
我们在 JBoss EAP 7.1 和 Spring 4.3.10 上使用 JMS 队列。
我们要将特定队列设置为 "last-value-queue",详见 here。
如果我们注释掉消费者配置,即队列没有附加消费者,它将作为最后一个值队列工作,因此具有相同 _AMQ_LVQ_NAME
值的传入消息将取代预先存在的消息,并且消息不会累积。
如果我们保留消费者配置并使用 Thread.sleep()
模拟一个较长的 运行 进程,以便不立即消费除第一个消息之外的消息,具有相同 _AMQ_LVQ_NAME
值的消息将不会'丢弃,并累加到队列中。
你有什么线索吗?
我们在standalone.xml
中的JBoss配置:
<?xml version="1.0" encoding="UTF-8"?>
<subsystem xmlns="urn:jboss:domain:messaging-activemq:2.0">
<server name="default">
...
<security-setting name="#">
<role name="guest" send="true" consume="true" create-non-durable-queue="true" delete-non-durable-queue="true" />
</security-setting>
...
<address-setting name="jms.queue.NgedeRequestQueue" last-value-queue="true" address-full-policy="BLOCK" />
<address-setting name="#" dead-letter-address="jms.queue.DLQ" expiry-address="jms.queue.ExpiryQueue" max-size-bytes="10485760" page-size-bytes="2097152" message-counter-history-day-limit="10" redistribution-delay="1000" />
<jms-queue name="ExpiryQueue" entries="java:/jms/queue/ExpiryQueue" />
<jms-queue name="DLQ" entries="java:/jms/queue/DLQ" />
<jms-queue name="NgedeRequestQueue" entries="java:/jms/queue/NgedeRequestQueue" durable="true" />
...
<connection-factory name="InVmConnectionFactory" entries="java:/ConnectionFactory" connectors="in-vm" />
<connection-factory name="RemoteConnectionFactory" entries="java:jboss/exported/jms/RemoteConnectionFactory" connectors="http-connector" ha="true" block-on-acknowledge="true" reconnect-attempts="-1" />
<connection-factory name="NgedeConnectionFactory" entries="java:jboss/exported/jms/NgedeConnectionFactory" connectors="http-connector" />
<pooled-connection-factory name="activemq-ra" entries="java:/JmsXA java:jboss/DefaultJMSConnectionFactory" connectors="in-vm" transaction="xa" />
</server>
</subsystem>
我们的Spring消费者配置:
<!-- Listener Definition -->
<bean id="messageListenerRequest" class="it.eng.ngede.jms.consumer.NationalApplicationRequestConsumer"/>
<bean id="NgedeRequestQueue" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName" value="java:/jms/queue/NgedeRequestQueue" />
<property name="jndiTemplate" ref="jnditemplate" />
</bean>
<bean id="connectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName" value="java:/ConnectionFactory" />
<property name="jndiTemplate" ref="jnditemplate" />
</bean>
<bean id="jnditemplate" class="org.springframework.jndi.JndiTemplate">
<property name="environment">
<props>
<prop key="java.naming.factory.initial">org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory</prop>
</props>
</property>
</bean>
<!-- JmsContainer Definition -->
<bean id="jmsContainerRequest" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="NgedeRequestQueue" />
<property name="messageListener" ref="messageListenerRequest" />
<property name="concurrentConsumers" value="1" />
</bean>
您看到了预期的行为。如果连接了消费者,则默认情况下,由于消息缓冲,发送给代理的消息将立即发送给消费者。发送给消费者的消息被视为 "in delivery"(即它们已发送给客户端但尚未得到确认)。 "in delivery" 的消息不能被 "last value" 替换,因为此时它们基本上不受经纪人的控制。
如果您想要不同的行为,那么我建议您在连接 URL 上设置 consumerWindowSize=0
,这样消费者就不会缓冲消息。当然,这可能会降低消息吞吐量,但这对您的用例来说可能并不重要。