Connection.start JMS MessageProducer 不需要,但 MessageConsumer 需要

Connection.start is not needed for JMS MessageProducer but needed for MessageConsumer

A - 问题

我知道有一个类似的问题,但在 SO 中不一样。

我试图了解 JMS 中 MessageProducerMessageConsumer 背后发生的事情。使用 ActiveMQ 的实现,我编写了一个简单的 MessageProducer 示例来将消息发送到队列,以及一个 MessageConsumer 示例从队列中消费消息,而 运行 ActiveMQ 在本地。

Connection#start 方法需要将消息发送到队列。具体调试点如下。 Connection#start 触发 ActiveMQSession#start 方法。当调用 Connection#start 时触发此方法。在 org.apache.activemq.ActiveMQSession#start;

查看以下调试点

问题是,Connection#startMessageProducer 上并不明确需要,但在 MessageConsumer[ 上需要=66=]。但是,对于这两个示例,我们都需要清除资源(sessionconnection)。我意识到,如果我删除生产者的 Connection#start 方法,代码将执行,调试点将不会被触发(甚至不会在幕后),我会在队列。但是如果我删除消费者的 Connection#start 方法,代码将不会执行,这就是问题所在,为什么 MessageProducer 中不需要它并且代码成功执行但需要 消息消费者?还有为什么我们甚至不为 MessageProducer 使用 Connection#start 甚至我们需要关闭连接以刷新资源.好像代码有味道。

我看到字段 started 是一个 AtomicBoolean。我不是并发和多线程方面的专家,所以,可能有人可以解释为什么对于 MessageProducer,Connection#start 不是强制性的;

B - 使用 ActiveMQ 的 JMS MessageProducer 示例代码

package com.bzdgn.jms.Whosebug;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSSendMessageToQueue {
    
    private static final String ACTIVE_MQ_URL = "tcp://localhost:61616";

    public static void main(String[] args) throws JMSException {
        String queueName = "test_queue";
        String messageContent = "Hello Whosebug!";
        
        // Connection Factory from ActiveMQ Implementation
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVE_MQ_URL);
        
        // Get connection from Connection Factory
        Connection connection = connectionFactory.createConnection();
        
        // Create session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // Send Message to Queue
        Queue queue = session.createQueue(queueName);
        TextMessage msg = session.createTextMessage(messageContent);
        MessageProducer messageProducer = session.createProducer(queue);
        messageProducer.send(msg);
        
        // Clear resources
        session.close();
        connection.close();
    }

}

C - 带有 ActiveMQ 的 JMS MessageConsumer 示例代码

package com.bzdgn.jms.Whosebug;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSConsumeMessageFromQueue {
    
    private static final String ACTIVE_MQ_URL = "tcp://localhost:61616";

    public static void main(String[] args) throws JMSException {
        String queueName = "test_queue";
        
        // Connection Factory from ActiveMQ Implementation
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVE_MQ_URL);
        
        // Get connection from Connection Factory
        Connection connection = connectionFactory.createConnection();
        
        // Create session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // Consume Message from the Queue
        Queue queue = session.createQueue(queueName);
        MessageConsumer messageConsumer = session.createConsumer(queue);
        
        connection.start();
        
        Message message = messageConsumer.receive(500);
        
        if ( message != null ) {
            if ( message instanceof TextMessage ) {
                TextMessage textMessage = (TextMessage) message;
                String messageContent = textMessage.getText();
                System.out.println("Message Content: " + messageContent);
            }
        } else {
            System.out.println("No message in the queue: " + queueName);
        }
        
        // Clear resources
        session.close();
        connection.close();
    }
    
}

D - 配置和 Maven 依赖

JDK 版本是 1.8,我是 运行 ActiveMQ 5.15.12 客户端也使用相同的版本依赖性;

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-client</artifactId>
    <version>5.15.12</version>
</dependency>

此处的行为由 JMS 规范规定。简而言之,javax.jms.Connection.start() 适用于消费者而非生产者。它告诉代理开始向与连接关联的消费者传递消息。 JavaDoc for Connection 是这样说的:

It is typical to leave the connection in stopped mode until setup is complete (that is, until all message consumers have been created). At that point, the client calls the connection's start method, and messages begin arriving at the connection's consumers. This setup convention minimizes any client confusion that may result from asynchronous message delivery while the client is still in the process of setting itself up.

A connection can be started immediately, and the setup can be done afterwards. Clients that do this must be prepared to handle asynchronous message delivery while they are still in the process of setting up.

start()方法对生产者没有影响。您看到了预期的行为。

值得注意的是,如果您使用作为 JMS 2 一部分的简化 API,此行为会有些不同。如果您使用 JMSContext 创建 JMSConsumer 然后消息传递自动开始。明确地说,ActiveMQ 5.x 没有实现 JMS 2,但是 ActiveMQ Artemis 实现了。