如何从其他应用程序订阅 Spring Boot JMS 主题

How to subcribe to Spring Boot JMS topic from other app

我有 2 个应用程序,第一个应用程序启动了一个 ActiveMQ 代理(https://spring.io/guides/gs/messaging-jms/)。

我想在第二个应用程序中订阅第一个应用程序的主题。

如何在不启动 ActiveMQ 服务器的情况下执行此操作?


可能的解决方案:

服务器应用程序项目

import java.time.LocalDateTime;

import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.ui.ModelMap;

@SpringBootApplication
@EnableJms
@EnableScheduling
public class JsmServerApplication {

    @Autowired
    JmsTemplate jmsTemplate;

    @Bean
    public BrokerService broker() throws Exception {
        BrokerService ret = new BrokerService();
        ret.addConnector("tcp://0.0.0.0:4444"); // allow remote connections
        ret.setBrokerName("primary-broker");
        ret.setUseJmx(true);
        return ret;
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        return new ActiveMQConnectionFactory("tcp://localhost:4444");
    }

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jacksonJmsMessageConverter());
        return factory;
    }

    @Bean
    public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }

    public static void main(String[] args) {
        SpringApplication.run(JsmServerApplication.class, args);
    }

    @Scheduled(cron = "*/5 * * * * ?")
    public void run() {
        ModelMap msg = new ModelMap("now", LocalDateTime.now().toString());
        System.out.println("Sending: " + msg);
        jmsTemplate.convertAndSend("messages", msg);
    }

}

客户申请项目

import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
import org.springframework.ui.ModelMap;

@SpringBootApplication
@EnableJms
public class JsmClientApplication {

    @Bean
    public ConnectionFactory connectionFactory() {
        return new ActiveMQConnectionFactory("tcp://localhost:4444");
    }

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jacksonJmsMessageConverter());
        return factory;
    }

    @Bean
    public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }

    @JmsListener(destination = "messages", containerFactory = "jmsListenerContainerFactory")
    public void msg(ModelMap msg) {
        System.out.println(msg);
    }

    public static void main(String[] args) {
        SpringApplication.run(JsmClientApplication.class, args);
    }

}

这是正确的做法吗?


这样解决了:

http://javasampleapproach.com/java-integration/activemq-work-spring-jms-activemq-topic-publisher-subcribers-pattern-using-springboot

请参阅 了解如何监听 tcp 端口而不是 vm:// 传输。

您可以使用 MessageConsumer 像下面的代码一样使用数据

public static void main(String[] args) throws JMSException {
    // Getting JMS connection from the server
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
    Connection connection = connectionFactory.createConnection();
    connection.start();

    Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);
    Topic topic = session.createTopic("topic");
    MessageConsumer consumer = session.createConsumer(topic);

    MessageListener listner = new MessageListener() {
        public void onMessage(Message message) {
            try {
                //do operations
           } catch (JMSException e) {

           }
         }
    };

    consumer.setMessageListener(listner);
    connection.close();

}

由于您使用的是 ActiveMQConnectionFactory,您可以按如下方式设置代理

BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:4444);
broker.setPersistent(false);

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");

如果你对不使用ActiveMQ没有任何限制,你可以使用Kafka来做同样的事情。 Kafka 为您提供了一个高度可扩展的分布式消息总线,具有简单的 API。

https://kafka.apache.org/quickstart

我不确定这些限制,但我只是想让你感受一下 Kafka。但是,上面的代码应该可以帮助您理解从主题订阅和消费消息的概念。