从队列的待处理消息列表中获取消息时出现性能问题

Performance Issue while fetching message from pending message list of a queue

队列服务器:ActiveMQ, 协议:AMQP API:Apache QPID 客户端 JMS 0.3.0

在正常情况下,它对我来说工作正常并在几毫秒内获取消息。

在以下情况下面临问题:

  1. 假设队列名称:TESTQUEUE
  2. 队列中有 500 条具有 JMSCorrelationID 的未决消息
  3. 正在使用 qpid API.
  4. 获取带有 JMSCorrelationID 的消息

检索消息大约需要 25 秒。

遇到这种情况怎么办?

    // LOGIC: declaration
    Connection connection = null;
    MessageConsumer consumer = null;
    MetaData objMetaData = new MetaData();
    String strReturnData = "";
    String user = "";
    String password = "";
    String host = "";
    int port = 0;
    ConnectionFactoryImpl factory = null;
    Session session = null;
    Destination destination = null;
    Message msg = null;

    try {

        // LOGIC: set the connection details
        user = objMetaData.getMetaData(CommonConstant.QUEUE_USERNAME);
        password = objMetaData.getMetaData(CommonConstant.QUEUE_PASSWORD);
        host = objMetaData.getMetaData(CommonConstant.QUEUE_HOST);
        port = Integer.parseInt(objMetaData.getMetaData(
                CommonConstant.QUEUE_PORT).trim());

        // LOGIC: Initialize the connection factory with connection details
        factory = new ConnectionFactoryImpl(host, port, user, password, strCorelationId);

        // LOGIC: create connection
        connection = factory.createConnection();

        // LOGIC: create session
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // LOGIC: initialize destination with queue name
        destination = (Destination) session.createQueue(strQueueName);

        // LOGIC: open connection
        connection.start();

        // LOGIC: set correlation id
        consumer = session
                .createConsumer(
                        destination,"JMSCorrelationID ='"+ strCorelationId + "'");

        // LOGIC: get message
        if (blnIsRequestModeSync) {
            msg = consumer.receive((1000 * 60 * 2));
        } else {
            msg = consumer.receive(30000);
        }

        // LOGIC: check message type
        if (msg instanceof TextMessage) {

            // LOGIC: when text message
            TextMessage txtmsg = (TextMessage) msg;
            //System.out.println("TEXT MSG: " + txtmsg.getText());
            strReturnData = txtmsg.getText();
        } else {
            // LOGIC: when object message
            ObjectMessage objmsg = (ObjectMessage) msg;
            //System.out.println("Object  MSG:" + objmsg);
            strReturnData = (null != objmsg ? objmsg.toString() : "");
        }

没有足够的信息可以给你一个明确的答案,但这里有几件事可以发挥作用。

首先是您在队列中有多个消费者,并且根据消息到达的时间将消息预取给他们,以便第一个消费者持有第二个消费者感兴趣的消息,依此类推。您可以尝试减少客户端上的预取以排除这种情况。

第二个是队列深度足够大,您的客户端感兴趣的消息比 ActiveMQ 用来在内存中保存消息的默认页面大小更深,这意味着客户端不会收到它的消息,直到其他一些消费者拉出足够的消息,使您的慢速客户端感兴趣的消息在内存中可供选择。您可以在 per-destination 的基础上调整 ActiveMQ 使用的页面大小。但是,您应该警告自己,深度队列上的选择器是一种消息传递反模式,并且容易出现此类问题。