Spring 中的 JMS 主题订阅者使用 JMS 模板/消息订阅者
JMS Topic Subscriber in Spring using JMS template/ Message Subscriber
我有一个简单的 Spring JMS 应用程序 Producer/Subscriber 使用具有以下配置的 ActiveMQ :
应用程序上下文 xml:
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
<property name="userName" value="user" />
<property name="password" value="password" />
</bean>
<bean id="messageDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="messageQueue1" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE">
</property>
</bean>
<bean id="springJmsProducer" class="SpringJmsProducer">
<property name="destination" ref="messageDestination" />
<property name="jmsTemplate" ref="jmsTemplate" />
</bean>
<bean id="springJmsConsumer" class="SpringJmsConsumer">
<property name="destination" ref="messageDestination" />
<property name="jmsTemplate" ref="jmsTemplate" />
</bean>
以下是Spring制作人
public class SpringJmsProducer {
private JmsTemplate jmsTemplate;
private Destination destination;
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public Destination getDestination() {
return destination;
}
public void setDestination(Destination destination) {
this.destination = destination;
}
public void sendMessage(final String msg) {
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(msg);
}});
}
}
下面是Spring消费者:
public class SpringJmsConsumer {
private JmsTemplate jmsTemplate;
private Destination destination;
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public Destination getDestination() {
return destination;
}
public void setDestination(Destination destination) {
this.destination = destination;
}
public String receiveMessage() throws JMSException {
TextMessage textMessage =(TextMessage) jmsTemplate.receive(destination);
return textMessage.getText();
}
}
问题:当我启动生产者和 post 消息,然后启动消费者时,消费者不会读取旧消息,而只会读取消费者启动后 posted 的消息。任何人都可以帮助我如何制作这个持久订阅者,以便消费者读取队列中未确认的消息,而且我需要实现同步消费者而不是异步。
我已经尝试了所有可能的解决方案,但 none 有效。非常感谢任何帮助
如果您希望消费者在开始之前收到发送到该主题的消息,您有 2 个选择:
1.使用Activemq追溯消费者
Background A retroactive consumer is just a regular JMS Topic consumer
who indicates that at the start of a subscription every attempt should
be used to go back in time and send any old messages (or the last
message sent on that topic) that the consumer may have missed.
See the Subscription Recovery Policy for more detail.
您将消费者标记为追溯如下:
topic = new ActiveMQTopic("TEST.Topic?consumer.retroactive=true");
http://activemq.apache.org/retroactive-consumer.html
2。使用持久订阅者:
请注意,持久订阅者在第 2 次开始之前收到发送到主题的消息 运行
http://activemq.apache.org/manage-durable-subscribers.html
这可以通过 DefaultMessageListenerContainer 异步实现
<bean id="jmsContainer" destroy-method="shutdown"
class="org.springframework.jms.listener.DefaultMessageListenerContainer" >
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="messageDestination" />
<property name="messageListener" ref="messageListenerAdapter" />
<property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE" />
<property name="subscriptionDurable" value="true" />
<property name="clientId" value="UniqueClientId" />
</bean>
<bean id="messageListenerAdapter"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="springJmsConsumer" />
</bean>
<bean id="springJmsConsumer" class="SpringJmsConsumer">
</bean>
并更新您的消费者:
public class SpringJmsConsumer implements javax.jms.MessageListener {
public void onMessage(javax.jms.Message message) {
// treat message;
message.acknowledge();
}
}
更新使用
如果你想要一个同步持久订阅者,一个例子
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
public class SpringJmsConsumer {
private Connection conn;
private TopicSubscriber topicSubscriber;
public SpringJmsConsumer(ConnectionFactory connectionFactory, Topic destination ) {
conn = connectionFactory.createConnection("user", "password");
Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
topicSubscriber = session.createDurableSubscriber(destination, "UniqueClientId");
conn.start();
}
public String receiveMessage() throws JMSException {
TextMessage textMessage = (TextMessage) topicSubscriber.receive();
return textMessage.getText();
}
}
并更新springJmsConsumer
<bean id="springJmsConsumer" class="SpringJmsConsumer">
<constructor-arg ref="connectionFactory" />
<constructor-arg ref="messageDestination" />
</bean>
请注意,连接失败不受此代码管理。
我有一个简单的 Spring JMS 应用程序 Producer/Subscriber 使用具有以下配置的 ActiveMQ :
应用程序上下文 xml:
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
<property name="userName" value="user" />
<property name="password" value="password" />
</bean>
<bean id="messageDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="messageQueue1" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE">
</property>
</bean>
<bean id="springJmsProducer" class="SpringJmsProducer">
<property name="destination" ref="messageDestination" />
<property name="jmsTemplate" ref="jmsTemplate" />
</bean>
<bean id="springJmsConsumer" class="SpringJmsConsumer">
<property name="destination" ref="messageDestination" />
<property name="jmsTemplate" ref="jmsTemplate" />
</bean>
以下是Spring制作人
public class SpringJmsProducer {
private JmsTemplate jmsTemplate;
private Destination destination;
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public Destination getDestination() {
return destination;
}
public void setDestination(Destination destination) {
this.destination = destination;
}
public void sendMessage(final String msg) {
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(msg);
}});
}
}
下面是Spring消费者:
public class SpringJmsConsumer {
private JmsTemplate jmsTemplate;
private Destination destination;
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public Destination getDestination() {
return destination;
}
public void setDestination(Destination destination) {
this.destination = destination;
}
public String receiveMessage() throws JMSException {
TextMessage textMessage =(TextMessage) jmsTemplate.receive(destination);
return textMessage.getText();
}
}
问题:当我启动生产者和 post 消息,然后启动消费者时,消费者不会读取旧消息,而只会读取消费者启动后 posted 的消息。任何人都可以帮助我如何制作这个持久订阅者,以便消费者读取队列中未确认的消息,而且我需要实现同步消费者而不是异步。
我已经尝试了所有可能的解决方案,但 none 有效。非常感谢任何帮助
如果您希望消费者在开始之前收到发送到该主题的消息,您有 2 个选择:
1.使用Activemq追溯消费者
Background A retroactive consumer is just a regular JMS Topic consumer who indicates that at the start of a subscription every attempt should be used to go back in time and send any old messages (or the last message sent on that topic) that the consumer may have missed.
See the Subscription Recovery Policy for more detail.
您将消费者标记为追溯如下:
topic = new ActiveMQTopic("TEST.Topic?consumer.retroactive=true");
http://activemq.apache.org/retroactive-consumer.html
2。使用持久订阅者:
请注意,持久订阅者在第 2 次开始之前收到发送到主题的消息 运行
http://activemq.apache.org/manage-durable-subscribers.html
这可以通过 DefaultMessageListenerContainer 异步实现
<bean id="jmsContainer" destroy-method="shutdown"
class="org.springframework.jms.listener.DefaultMessageListenerContainer" >
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="messageDestination" />
<property name="messageListener" ref="messageListenerAdapter" />
<property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE" />
<property name="subscriptionDurable" value="true" />
<property name="clientId" value="UniqueClientId" />
</bean>
<bean id="messageListenerAdapter"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="springJmsConsumer" />
</bean>
<bean id="springJmsConsumer" class="SpringJmsConsumer">
</bean>
并更新您的消费者:
public class SpringJmsConsumer implements javax.jms.MessageListener {
public void onMessage(javax.jms.Message message) {
// treat message;
message.acknowledge();
}
}
更新使用
如果你想要一个同步持久订阅者,一个例子
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
public class SpringJmsConsumer {
private Connection conn;
private TopicSubscriber topicSubscriber;
public SpringJmsConsumer(ConnectionFactory connectionFactory, Topic destination ) {
conn = connectionFactory.createConnection("user", "password");
Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
topicSubscriber = session.createDurableSubscriber(destination, "UniqueClientId");
conn.start();
}
public String receiveMessage() throws JMSException {
TextMessage textMessage = (TextMessage) topicSubscriber.receive();
return textMessage.getText();
}
}
并更新springJmsConsumer
<bean id="springJmsConsumer" class="SpringJmsConsumer">
<constructor-arg ref="connectionFactory" />
<constructor-arg ref="messageDestination" />
</bean>
请注意,连接失败不受此代码管理。