ActiveMQ中BrokerService的用例是什么以及如何正确使用
What is the use case of BrokerService in ActiveMQ and how to use it correctly
我是 ActiveMQ 的新手。我试图通过检查 Apache 在 link:-
提供的示例代码来研究和检查它是如何工作的
http://activemq.apache.org/how-should-i-implement-request-response-with-jms.html
public class Server implements MessageListener {
private static int ackMode;
private static String messageQueueName;
private static String messageBrokerUrl;
private Session session;
private boolean transacted = false;
private MessageProducer replyProducer;
private MessageProtocol messageProtocol;
static {
messageBrokerUrl = "tcp://localhost:61616";
messageQueueName = "client.messages";
ackMode = Session.AUTO_ACKNOWLEDGE;
}
public Server() {
try {
//This message broker is embedded
BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
broker.addConnector(messageBrokerUrl);
broker.start();
} catch (Exception e) {
System.out.println("Exception: "+e.getMessage());
//Handle the exception appropriately
}
//Delegating the handling of messages to another class, instantiate it before setting up JMS so it
//is ready to handle messages
this.messageProtocol = new MessageProtocol();
this.setupMessageQueueConsumer();
}
private void setupMessageQueueConsumer() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(messageBrokerUrl);
Connection connection;
try {
connection = connectionFactory.createConnection();
connection.start();
this.session = connection.createSession(this.transacted, ackMode);
Destination adminQueue = this.session.createQueue(messageQueueName);
//Setup a message producer to respond to messages from clients, we will get the destination
//to send to from the JMSReplyTo header field from a Message
this.replyProducer = this.session.createProducer(null);
this.replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//Set up a consumer to consume messages off of the admin queue
MessageConsumer consumer = this.session.createConsumer(adminQueue);
consumer.setMessageListener(this);
} catch (JMSException e) {
System.out.println("Exception: "+e.getMessage());
}
}
public void onMessage(Message message) {
try {
TextMessage response = this.session.createTextMessage();
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String messageText = txtMsg.getText();
response.setText(this.messageProtocol.handleProtocolMessage(messageText));
}
//Set the correlation ID from the received message to be the correlation id of the response message
//this lets the client identify which message this is a response to if it has more than
//one outstanding message to the server
response.setJMSCorrelationID(message.getJMSCorrelationID());
//Send the response to the Destination specified by the JMSReplyTo field of the received message,
//this is presumably a temporary queue created by the client
this.replyProducer.send(message.getJMSReplyTo(), response);
} catch (JMSException e) {
System.out.println("Exception: "+e.getMessage());
}
}
public static void main(String[] args) {
new Server();
}
}
我对 messageBrokerUrl 的困惑 = "tcp://localhost:61616";您知道默认情况下,ActiveMQ 服务 运行ning 在端口 61616 上。为什么这个例子选择相同的端口。如果我尝试 运行 代码将 eception 视为:
异常:无法绑定到服务器套接字:tcp://localhost:61616 由于:java.net.BindException:地址已在使用中:JVM_Bind
也许如果我更改端口号,我可以执行代码。
请告诉我示例中为什么会这样以及如何使用 BrokerService。
此示例中的 BrokerService 正在尝试创建内存中的 ActiveMQ 代理以供示例使用。鉴于您看到的错误,我猜您已经在绑定到端口 61616 的机器上安装了一个 ActiveMQ 代理 运行ning,因为这是默认端口,因此两者存在冲突。您可以停止外部代理和 运行 示例,或者将示例修改为不 运行 嵌入式代理而仅依赖于您的外部代理实例。
嵌入式代理非常适合单元测试或创建不需要用户安装代理和 运行ning 的示例。
我是 ActiveMQ 的新手。我试图通过检查 Apache 在 link:-
提供的示例代码来研究和检查它是如何工作的http://activemq.apache.org/how-should-i-implement-request-response-with-jms.html
public class Server implements MessageListener {
private static int ackMode;
private static String messageQueueName;
private static String messageBrokerUrl;
private Session session;
private boolean transacted = false;
private MessageProducer replyProducer;
private MessageProtocol messageProtocol;
static {
messageBrokerUrl = "tcp://localhost:61616";
messageQueueName = "client.messages";
ackMode = Session.AUTO_ACKNOWLEDGE;
}
public Server() {
try {
//This message broker is embedded
BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
broker.addConnector(messageBrokerUrl);
broker.start();
} catch (Exception e) {
System.out.println("Exception: "+e.getMessage());
//Handle the exception appropriately
}
//Delegating the handling of messages to another class, instantiate it before setting up JMS so it
//is ready to handle messages
this.messageProtocol = new MessageProtocol();
this.setupMessageQueueConsumer();
}
private void setupMessageQueueConsumer() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(messageBrokerUrl);
Connection connection;
try {
connection = connectionFactory.createConnection();
connection.start();
this.session = connection.createSession(this.transacted, ackMode);
Destination adminQueue = this.session.createQueue(messageQueueName);
//Setup a message producer to respond to messages from clients, we will get the destination
//to send to from the JMSReplyTo header field from a Message
this.replyProducer = this.session.createProducer(null);
this.replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//Set up a consumer to consume messages off of the admin queue
MessageConsumer consumer = this.session.createConsumer(adminQueue);
consumer.setMessageListener(this);
} catch (JMSException e) {
System.out.println("Exception: "+e.getMessage());
}
}
public void onMessage(Message message) {
try {
TextMessage response = this.session.createTextMessage();
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String messageText = txtMsg.getText();
response.setText(this.messageProtocol.handleProtocolMessage(messageText));
}
//Set the correlation ID from the received message to be the correlation id of the response message
//this lets the client identify which message this is a response to if it has more than
//one outstanding message to the server
response.setJMSCorrelationID(message.getJMSCorrelationID());
//Send the response to the Destination specified by the JMSReplyTo field of the received message,
//this is presumably a temporary queue created by the client
this.replyProducer.send(message.getJMSReplyTo(), response);
} catch (JMSException e) {
System.out.println("Exception: "+e.getMessage());
}
}
public static void main(String[] args) {
new Server();
}
}
我对 messageBrokerUrl 的困惑 = "tcp://localhost:61616";您知道默认情况下,ActiveMQ 服务 运行ning 在端口 61616 上。为什么这个例子选择相同的端口。如果我尝试 运行 代码将 eception 视为: 异常:无法绑定到服务器套接字:tcp://localhost:61616 由于:java.net.BindException:地址已在使用中:JVM_Bind
也许如果我更改端口号,我可以执行代码。
请告诉我示例中为什么会这样以及如何使用 BrokerService。
此示例中的 BrokerService 正在尝试创建内存中的 ActiveMQ 代理以供示例使用。鉴于您看到的错误,我猜您已经在绑定到端口 61616 的机器上安装了一个 ActiveMQ 代理 运行ning,因为这是默认端口,因此两者存在冲突。您可以停止外部代理和 运行 示例,或者将示例修改为不 运行 嵌入式代理而仅依赖于您的外部代理实例。
嵌入式代理非常适合单元测试或创建不需要用户安装代理和 运行ning 的示例。