Spring - ActiveMQ - 持久订阅 - 关闭连接并重新订阅以获取离线消息

Spring - ActiveMQ - Durable Subscription - Close Connection and Resubscribe to get the offline messages

我想在 Spring-JMS 中使用 activeMQ 实现一个解决方案,我想在其中创建对主题的持久订阅。目的是如果订阅者关闭订阅一段时间并再次使用相同的客户端 ID 和订阅名称重新创建持久订阅,订阅者应该收到订阅关闭期间传递的所有消息。

我想为持久订阅实现 ORACLE URL 中提到的以下逻辑:https://docs.oracle.com/cd/E19798-01/821-1841/bncgd/index.html

但我无法使用 spring-jms 执行此操作。根据 URL,我需要获取 messageConsumer 实例并在该方法上调用 close() 以暂时停止从主题接收消息。但是不知道怎么弄。

以下是我的配置。请告诉我如何修改配置以执行此操作。

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                    http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">


<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
    p:userName="admin"
    p:password="admin" 
    p:brokerURL="tcp://127.0.0.1:61616"
    primary="true"
    ></bean>

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer" p:durableSubscriptionName="gxaa-durable1" p:clientId="gxaa-client1">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="adiTopic"/>
    <property name="messageListener" ref="adiListener"/>
</bean>

<bean id="configTemplate" class="org.springframework.jms.core.JmsTemplate" 
        p:connectionFactory-ref="connectionFactory"
        p:defaultDestination-ref="adiTopic" primary="true"
        p:pubSubDomain="true">
</bean>

<bean id="adiTopic" class="org.apache.activemq.command.ActiveMQTopic" p:physicalName="gcaa.adi.topic"></bean>

<bean id="adiListener" class="com.gcaa.asset.manager.impl.AdiListener"></bean>

为什么不调用 DefaultMessageListenerContainer.stop(); 来停止容器和消费者?

您可以将 jmsContainer 注入另一个 bean 并在需要时关闭它,稍后再调用 start()。

当您的持久消费者离线时发送给代理的所有消息都将被存储,直到它重新连接。

要使订阅持久化,您需要将其添加到 jmsContainer bean

    <property name="subscriptionDurable" value="true" />
    <property name="cacheLevel" value="1" />

您可以添加 subscriptionName 或将使用指定消息侦听器的 class 名称。

您可以在 connectionFactory

中添加一个 clientID
    <property name="clientID" value="${jms.clientId}" />

或使用

<bean class="org.springframework.jms.connection.SingleConnectionFactory" id="singleConnectionFactory"> <constructor-arg ref="connectionFactory" /> <property name="reconnectOnException" value="true" /> <property name="clientId" value="${jms.clientId}" /> </bean>

并更新 jmsContainer

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer" p:durableSubscriptionName="gxaa-durable1" p:clientId="gxaa-client1"> <property name="connectionFactory" ref="singleConnectionFactory" /> <property name="destination" ref="adiTopic" /> <property name="messageListener" ref="adiListener" /> <property name="subscriptionDurable" value="true" /> <property name="cacheLevel" value="1" /> </bean>

更新:

如果你的 adiListener 实现 org.springframework.jms.listener.SessionAwareMessageListener 它必须定义方法 onMessage(M message, Session session) 并且当你有会话时你可以调用 javax.jms.Session.unsubscribe(String subscriptionName)

subscriptionName在上面定义,可以注入到这个bean中或者可以使用指定消息监听器的class名称。