使用 Artemis、JBoss EAP 7.1 和 last-value-queue

Using Artemis, JBoss EAP 7.1, and last-value-queue

我们在 JBoss EAP 7.1 和 Spring 4.3.10 上使用 JMS 队列。

我们要将特定队列设置为 "last-value-queue",详见 here

如果我们注释掉消费者配置,即队列没有附加消费者,它将作为最后一个值队列工作,因此具有相同 _AMQ_LVQ_NAME 值的传入消息将取代预先存在的消息,并且消息不会累积。

如果我们保留消费者配置并使用 Thread.sleep() 模拟一个较长的 运行 进程,以便不立即消费除第一个消息之外的消息,具有相同 _AMQ_LVQ_NAME 值的消息将不会'丢弃,并累加到队列中。

你有什么线索吗?

我们在standalone.xml中的JBoss配置:

<?xml version="1.0" encoding="UTF-8"?>
<subsystem xmlns="urn:jboss:domain:messaging-activemq:2.0">
   <server name="default">
    ...
      <security-setting name="#">
         <role name="guest" send="true" consume="true" create-non-durable-queue="true" delete-non-durable-queue="true" />
      </security-setting>
      ...
      <address-setting name="jms.queue.NgedeRequestQueue" last-value-queue="true" address-full-policy="BLOCK" />
      <address-setting name="#" dead-letter-address="jms.queue.DLQ" expiry-address="jms.queue.ExpiryQueue" max-size-bytes="10485760" page-size-bytes="2097152" message-counter-history-day-limit="10" redistribution-delay="1000" />
      <jms-queue name="ExpiryQueue" entries="java:/jms/queue/ExpiryQueue" />
      <jms-queue name="DLQ" entries="java:/jms/queue/DLQ" />
      <jms-queue name="NgedeRequestQueue" entries="java:/jms/queue/NgedeRequestQueue" durable="true" />
      ...
      <connection-factory name="InVmConnectionFactory" entries="java:/ConnectionFactory" connectors="in-vm" />
      <connection-factory name="RemoteConnectionFactory" entries="java:jboss/exported/jms/RemoteConnectionFactory" connectors="http-connector" ha="true" block-on-acknowledge="true" reconnect-attempts="-1" />
      <connection-factory name="NgedeConnectionFactory" entries="java:jboss/exported/jms/NgedeConnectionFactory" connectors="http-connector" />
      <pooled-connection-factory name="activemq-ra" entries="java:/JmsXA java:jboss/DefaultJMSConnectionFactory" connectors="in-vm" transaction="xa" />
   </server>
</subsystem>

我们的Spring消费者配置:

<!-- Listener Definition -->
<bean id="messageListenerRequest" class="it.eng.ngede.jms.consumer.NationalApplicationRequestConsumer"/>

<bean id="NgedeRequestQueue" class="org.springframework.jndi.JndiObjectFactoryBean">
    <property name="jndiName" value="java:/jms/queue/NgedeRequestQueue" />
    <property name="jndiTemplate" ref="jnditemplate" />
</bean>

<bean id="connectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
    <property name="jndiName" value="java:/ConnectionFactory" />
    <property name="jndiTemplate" ref="jnditemplate" />
</bean>

<bean id="jnditemplate" class="org.springframework.jndi.JndiTemplate">
    <property name="environment">
        <props>
            <prop key="java.naming.factory.initial">org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory</prop>
        </props>
    </property>
</bean>

<!-- JmsContainer Definition -->
<bean id="jmsContainerRequest" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destination" ref="NgedeRequestQueue" />
    <property name="messageListener" ref="messageListenerRequest" />
    <property name="concurrentConsumers" value="1" />
</bean>

您看到了预期的行为。如果连接了消费者,则默认情况下,由于消息缓冲,发送给代理的消息将立即发送给消费者。发送给消费者的消息被视为 "in delivery"(即它们已发送给客户端但尚未得到确认)。 "in delivery" 的消息不能被 "last value" 替换,因为此时它们基本上不受经纪人的控制。

如果您想要不同的行为,那么我建议您在连接 URL 上设置 consumerWindowSize=0,这样消费者就不会缓冲消息。当然,这可能会降低消息吞吐量,但这对您的用例来说可能并不重要。