ActiveMQ中BrokerService的用例是什么以及如何正确使用

What is the use case of BrokerService in ActiveMQ and how to use it correctly

我是 ActiveMQ 的新手。我试图通过检查 Apache 在 link:-

提供的示例代码来研究和检查它是如何工作的

http://activemq.apache.org/how-should-i-implement-request-response-with-jms.html

    public class Server implements MessageListener {
        private static int ackMode;
        private static String messageQueueName;
        private static String messageBrokerUrl;

        private Session session;
        private boolean transacted = false;
        private MessageProducer replyProducer;
        private MessageProtocol messageProtocol;

        static {
            messageBrokerUrl = "tcp://localhost:61616";
            messageQueueName = "client.messages";
            ackMode = Session.AUTO_ACKNOWLEDGE;
        }

        public Server() {
            try {
                //This message broker is embedded
                BrokerService broker = new BrokerService();
                broker.setPersistent(false);
                broker.setUseJmx(false);
                broker.addConnector(messageBrokerUrl);
                broker.start();
            } catch (Exception e) {
                System.out.println("Exception: "+e.getMessage());
                //Handle the exception appropriately
            }

            //Delegating the handling of messages to another class, instantiate it before setting up JMS so it
            //is ready to handle messages
            this.messageProtocol = new MessageProtocol();
            this.setupMessageQueueConsumer();
        }

        private void setupMessageQueueConsumer() {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(messageBrokerUrl);
            Connection connection;
            try {
                connection = connectionFactory.createConnection();
                connection.start();
                this.session = connection.createSession(this.transacted, ackMode);
                Destination adminQueue = this.session.createQueue(messageQueueName);

                //Setup a message producer to respond to messages from clients, we will get the destination
                //to send to from the JMSReplyTo header field from a Message
                this.replyProducer = this.session.createProducer(null);
                this.replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

                //Set up a consumer to consume messages off of the admin queue
                MessageConsumer consumer = this.session.createConsumer(adminQueue);
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                System.out.println("Exception: "+e.getMessage());
            }
        }

        public void onMessage(Message message) {
            try {
                TextMessage response = this.session.createTextMessage();
                if (message instanceof TextMessage) {
                    TextMessage txtMsg = (TextMessage) message;
                    String messageText = txtMsg.getText();
                    response.setText(this.messageProtocol.handleProtocolMessage(messageText));
                }

                //Set the correlation ID from the received message to be the correlation id of the response message
                //this lets the client identify which message this is a response to if it has more than
                //one outstanding message to the server
                response.setJMSCorrelationID(message.getJMSCorrelationID());

                //Send the response to the Destination specified by the JMSReplyTo field of the received message,
                //this is presumably a temporary queue created by the client
                this.replyProducer.send(message.getJMSReplyTo(), response);
            } catch (JMSException e) {
                System.out.println("Exception: "+e.getMessage());
            }
        }

        public static void main(String[] args) {
            new Server();
        }
    }

我对 messageBrokerUrl 的困惑 = "tcp://localhost:61616";您知道默认情况下,ActiveMQ 服务 运行ning 在端口 61616 上。为什么这个例子选择相同的端口。如果我尝试 运行 代码将 eception 视为: 异常:无法绑定到服务器套接字:tcp://localhost:61616 由于:java.net.BindException:地址已在使用中:JVM_Bind

也许如果我更改端口号,我可以执行代码。

请告诉我示例中为什么会这样以及如何使用 BrokerService。

此示例中的 BrokerService 正在尝试创建内存中的 ActiveMQ 代理以供示例使用。鉴于您看到的错误,我猜您已经在绑定到端口 61616 的机器上安装了一个 ActiveMQ 代理 运行ning,因为这是默认端口,因此两者存在冲突。您可以停止外部代理和 运行 示例,或者将示例修改为不 运行 嵌入式代理而仅依赖于您的外部代理实例。

嵌入式代理非常适合单元测试或创建不需要用户安装代理和 运行ning 的示例。