Spring Boot + ActiveMQ 以编程方式即时订阅主题
Spring Boot + ActiveMQ programmatically subscribe to topics on the fly
我正在尝试实现一项功能,我的侦听器 class 可以 subscribe/unsubscribe JMS 主题。
经过一些研究,没有找到一个明确的方法来做到这一点,我想出了两个解决方案:
- 有一个监听器 class,它包含一个字符串主题名称列表,并定期 运行 通过所有这些它应该被订阅的主题和 运行 阻塞
jmsTemplate.receiveAndConvert(topicName)
在每个(可能将阻塞操作本身委托给工作池)。
Subscribing/Unsubscribing 从主题中删除主题名称就像从列表中删除主题名称一样简单。
有一个工厂class,它将为应用程序需要订阅的每个主题构建一个新的侦听器,使用如下方法:
public MessageListenerContainer createListener(String topic) {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setDestinationName(topic);
container.setMessageListener(new MyListenerClass());
return container;
}
第二个选项对我来说似乎更优雅,但我不确定听众的生命周期。我浏览了一些 spring boot 的 jms 和 activemq 模块的源代码并注意到 DefaultMessageListenerContainer
有方法 initialize()
和 start()
虽然我不确定 how/if我需要使用这些,因为我可以找到以这种方式构建的 MessageListenerContainer
的唯一方法是作为 Bean
声明。
此外,当取消订阅主题时,因此想要销毁与其关联的侦听器容器,除了调用 stop(callback)
方法之外,是否还需要做更多的事情?
我对 JMS/ActiveMQ 及其 Spring 集成的理解是否正确,因为没有更简单的方法来实现这一点?我的做法正确吗?
恕我直言,只要你
- 从spring获取connectionFactory(不是一个
PooledConnectionFactory
)
- 正确调用
initialise()
和 start()
订阅和 stop()
取消订阅
- 不要期望在异常情况下重新传递消息
第二种方法应该没问题
要在运行时注册新的 JmsListenerEndpoint
,您必须完成 2 个步骤
1 创建自定义 MessageListener
服务
@Service
public class CustomMessageListener implements MessageListener {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("[Custom message listener] " + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
2 注册新端点,使用 JmsListenerEndpointRegistry
@Service
public class MessageListenersService {
@Autowired
private JmsListenerEndpointRegistry registry;
@Autowired
@Qualifier("containerFactory")
private DefaultJmsListenerContainerFactory factory;
public void registerListener(String queueNameToListen, MessageListener listener) {
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId("ep-"+listener.hashCode()); // ID is mandatory
endpoint.setMessageListener(listener);
endpoint.setDestination(queueNameToListen);
registry.registerListenerContainer(endpoint, factory, true);
}
}
使用它
private static final String CUSTOM_DESTINATION = "queue-42";
@Autowired
MessageListenersService messageListenersService;
@Autowired
CustomMessageListener customMessageListener;
@Autowired
JmsTemplate jmsTemplate;
@PostConstruct
public void createCustomListener() throws InterruptedException {
messageListenersService.registerListener(CUSTOM_DESTINATION, customMessageListener);
jmsTemplate.send(CUSTOM_DESTINATION, session -> session.createTextMessage("hello world"));
// wait your message:
TimeUnit.SECONDS.sleep(1);
}
我正在尝试实现一项功能,我的侦听器 class 可以 subscribe/unsubscribe JMS 主题。 经过一些研究,没有找到一个明确的方法来做到这一点,我想出了两个解决方案:
- 有一个监听器 class,它包含一个字符串主题名称列表,并定期 运行 通过所有这些它应该被订阅的主题和 运行 阻塞
jmsTemplate.receiveAndConvert(topicName)
在每个(可能将阻塞操作本身委托给工作池)。 Subscribing/Unsubscribing 从主题中删除主题名称就像从列表中删除主题名称一样简单。 有一个工厂class,它将为应用程序需要订阅的每个主题构建一个新的侦听器,使用如下方法:
public MessageListenerContainer createListener(String topic) { DefaultMessageListenerContainer container = new DefaultMessageListenerContainer(); container.setConnectionFactory(connectionFactory()); container.setDestinationName(topic); container.setMessageListener(new MyListenerClass()); return container;
}
第二个选项对我来说似乎更优雅,但我不确定听众的生命周期。我浏览了一些 spring boot 的 jms 和 activemq 模块的源代码并注意到 DefaultMessageListenerContainer
有方法 initialize()
和 start()
虽然我不确定 how/if我需要使用这些,因为我可以找到以这种方式构建的 MessageListenerContainer
的唯一方法是作为 Bean
声明。
此外,当取消订阅主题时,因此想要销毁与其关联的侦听器容器,除了调用 stop(callback)
方法之外,是否还需要做更多的事情?
我对 JMS/ActiveMQ 及其 Spring 集成的理解是否正确,因为没有更简单的方法来实现这一点?我的做法正确吗?
恕我直言,只要你
- 从spring获取connectionFactory(不是一个
PooledConnectionFactory
) - 正确调用
initialise()
和start()
订阅和stop()
取消订阅 - 不要期望在异常情况下重新传递消息
第二种方法应该没问题
要在运行时注册新的 JmsListenerEndpoint
,您必须完成 2 个步骤
1 创建自定义 MessageListener
服务
@Service
public class CustomMessageListener implements MessageListener {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("[Custom message listener] " + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
2 注册新端点,使用 JmsListenerEndpointRegistry
@Service
public class MessageListenersService {
@Autowired
private JmsListenerEndpointRegistry registry;
@Autowired
@Qualifier("containerFactory")
private DefaultJmsListenerContainerFactory factory;
public void registerListener(String queueNameToListen, MessageListener listener) {
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId("ep-"+listener.hashCode()); // ID is mandatory
endpoint.setMessageListener(listener);
endpoint.setDestination(queueNameToListen);
registry.registerListenerContainer(endpoint, factory, true);
}
}
使用它
private static final String CUSTOM_DESTINATION = "queue-42";
@Autowired
MessageListenersService messageListenersService;
@Autowired
CustomMessageListener customMessageListener;
@Autowired
JmsTemplate jmsTemplate;
@PostConstruct
public void createCustomListener() throws InterruptedException {
messageListenersService.registerListener(CUSTOM_DESTINATION, customMessageListener);
jmsTemplate.send(CUSTOM_DESTINATION, session -> session.createTextMessage("hello world"));
// wait your message:
TimeUnit.SECONDS.sleep(1);
}