从队列的待处理消息列表中获取消息时出现性能问题
Performance Issue while fetching message from pending message list of a queue
队列服务器:ActiveMQ,
协议:AMQP
API:Apache QPID 客户端 JMS 0.3.0
在正常情况下,它对我来说工作正常并在几毫秒内获取消息。
在以下情况下面临问题:
- 假设队列名称:TESTQUEUE
- 队列中有 500 条具有 JMSCorrelationID 的未决消息
- 正在使用 qpid API.
获取带有 JMSCorrelationID 的消息
检索消息大约需要 25 秒。
遇到这种情况怎么办?
// LOGIC: declaration
Connection connection = null;
MessageConsumer consumer = null;
MetaData objMetaData = new MetaData();
String strReturnData = "";
String user = "";
String password = "";
String host = "";
int port = 0;
ConnectionFactoryImpl factory = null;
Session session = null;
Destination destination = null;
Message msg = null;
try {
// LOGIC: set the connection details
user = objMetaData.getMetaData(CommonConstant.QUEUE_USERNAME);
password = objMetaData.getMetaData(CommonConstant.QUEUE_PASSWORD);
host = objMetaData.getMetaData(CommonConstant.QUEUE_HOST);
port = Integer.parseInt(objMetaData.getMetaData(
CommonConstant.QUEUE_PORT).trim());
// LOGIC: Initialize the connection factory with connection details
factory = new ConnectionFactoryImpl(host, port, user, password, strCorelationId);
// LOGIC: create connection
connection = factory.createConnection();
// LOGIC: create session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// LOGIC: initialize destination with queue name
destination = (Destination) session.createQueue(strQueueName);
// LOGIC: open connection
connection.start();
// LOGIC: set correlation id
consumer = session
.createConsumer(
destination,"JMSCorrelationID ='"+ strCorelationId + "'");
// LOGIC: get message
if (blnIsRequestModeSync) {
msg = consumer.receive((1000 * 60 * 2));
} else {
msg = consumer.receive(30000);
}
// LOGIC: check message type
if (msg instanceof TextMessage) {
// LOGIC: when text message
TextMessage txtmsg = (TextMessage) msg;
//System.out.println("TEXT MSG: " + txtmsg.getText());
strReturnData = txtmsg.getText();
} else {
// LOGIC: when object message
ObjectMessage objmsg = (ObjectMessage) msg;
//System.out.println("Object MSG:" + objmsg);
strReturnData = (null != objmsg ? objmsg.toString() : "");
}
没有足够的信息可以给你一个明确的答案,但这里有几件事可以发挥作用。
首先是您在队列中有多个消费者,并且根据消息到达的时间将消息预取给他们,以便第一个消费者持有第二个消费者感兴趣的消息,依此类推。您可以尝试减少客户端上的预取以排除这种情况。
第二个是队列深度足够大,您的客户端感兴趣的消息比 ActiveMQ 用来在内存中保存消息的默认页面大小更深,这意味着客户端不会收到它的消息,直到其他一些消费者拉出足够的消息,使您的慢速客户端感兴趣的消息在内存中可供选择。您可以在 per-destination 的基础上调整 ActiveMQ 使用的页面大小。但是,您应该警告自己,深度队列上的选择器是一种消息传递反模式,并且容易出现此类问题。
队列服务器:ActiveMQ, 协议:AMQP API:Apache QPID 客户端 JMS 0.3.0
在正常情况下,它对我来说工作正常并在几毫秒内获取消息。
在以下情况下面临问题:
- 假设队列名称:TESTQUEUE
- 队列中有 500 条具有 JMSCorrelationID 的未决消息
- 正在使用 qpid API. 获取带有 JMSCorrelationID 的消息
检索消息大约需要 25 秒。
遇到这种情况怎么办?
// LOGIC: declaration
Connection connection = null;
MessageConsumer consumer = null;
MetaData objMetaData = new MetaData();
String strReturnData = "";
String user = "";
String password = "";
String host = "";
int port = 0;
ConnectionFactoryImpl factory = null;
Session session = null;
Destination destination = null;
Message msg = null;
try {
// LOGIC: set the connection details
user = objMetaData.getMetaData(CommonConstant.QUEUE_USERNAME);
password = objMetaData.getMetaData(CommonConstant.QUEUE_PASSWORD);
host = objMetaData.getMetaData(CommonConstant.QUEUE_HOST);
port = Integer.parseInt(objMetaData.getMetaData(
CommonConstant.QUEUE_PORT).trim());
// LOGIC: Initialize the connection factory with connection details
factory = new ConnectionFactoryImpl(host, port, user, password, strCorelationId);
// LOGIC: create connection
connection = factory.createConnection();
// LOGIC: create session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// LOGIC: initialize destination with queue name
destination = (Destination) session.createQueue(strQueueName);
// LOGIC: open connection
connection.start();
// LOGIC: set correlation id
consumer = session
.createConsumer(
destination,"JMSCorrelationID ='"+ strCorelationId + "'");
// LOGIC: get message
if (blnIsRequestModeSync) {
msg = consumer.receive((1000 * 60 * 2));
} else {
msg = consumer.receive(30000);
}
// LOGIC: check message type
if (msg instanceof TextMessage) {
// LOGIC: when text message
TextMessage txtmsg = (TextMessage) msg;
//System.out.println("TEXT MSG: " + txtmsg.getText());
strReturnData = txtmsg.getText();
} else {
// LOGIC: when object message
ObjectMessage objmsg = (ObjectMessage) msg;
//System.out.println("Object MSG:" + objmsg);
strReturnData = (null != objmsg ? objmsg.toString() : "");
}
没有足够的信息可以给你一个明确的答案,但这里有几件事可以发挥作用。
首先是您在队列中有多个消费者,并且根据消息到达的时间将消息预取给他们,以便第一个消费者持有第二个消费者感兴趣的消息,依此类推。您可以尝试减少客户端上的预取以排除这种情况。
第二个是队列深度足够大,您的客户端感兴趣的消息比 ActiveMQ 用来在内存中保存消息的默认页面大小更深,这意味着客户端不会收到它的消息,直到其他一些消费者拉出足够的消息,使您的慢速客户端感兴趣的消息在内存中可供选择。您可以在 per-destination 的基础上调整 ActiveMQ 使用的页面大小。但是,您应该警告自己,深度队列上的选择器是一种消息传递反模式,并且容易出现此类问题。