JMS 回调 returns 重复的 JMSMessageID

JMS callback returns duplicate JMSMessageID

我目前正在尝试将一堆简单的消息发送到带有普通 java 的队列。

public AtomicReference<Message> doSend(String message, String queue){
    try (JMSContext context = connectionFactory.createContext()) {
        TextMessage textMessage = context.createTextMessage(message);            
        final AtomicReference<Message> msg = new AtomicReference<>();
        msg.set(textMessage);
        log.info("Sending message to queue {}", queue);
        context.createProducer().send(createDestination(context, queue), textMessage);
        log.info("Message sent to queue {}, messageId provided {}", queue, msg.get().getJMSMessageID());
        return msg;
    }
    catch (Exception e) {
        log.error("Failed to send message to queue",e);
        throw new SipJmsException("Failed to send message to queue", e);
    }
}

private Destination createDestination(JMSContext context, String queue){
    log.debug("Creating destination queue {} connection",queue);
    return context.createQueue(queue);
}

我连续发送了 N 条消息,日志显示生成的 JMSMessageId 始终相同。

[main] Sending message to queue TEST_QUEUE
[main] Message sent to queue TEST_QUEUE, messageId provided ID:414d5120444556494d53514d20202020551c3f5d81619824
[main] Sending message to queue TEST_QUEUE
[main] Message sent to queue TEST_QUEUE, messageId provided ID:414d5120444556494d53514d20202020551c3f5d83619824

据我所知,JMSMessageId 应该是唯一的,它的冲突会导致问题。

O'Reily 书指出:

JMSMessageID 是一个字符串值,用于唯一标识一条消息。标识符的唯一性取决于供应商。 JMSMessageID 对于需要对消息进行唯一索引的 JMS 消费者应用程序中的历史存储库很有用。 与 JMSCorrelationID 结合使用,JMSMessageID 也可用于关联消息: String messageid = message.getJMSMessageID();

那么,为什么 MessageId 不是唯一的? (应用程序运行之间甚至相同)。

final AtomicReference<Message> msg = new AtomicReference<>();

为什么要使用 "final"。删除它并重试。

消息 ID 是唯一的,我用 *:

标记了不同的编号
414d5120444556494d53514d20202020551c3f5d81619824
                                         *
414d5120444556494d53514d20202020551c3f5d83619824

我创建了一个简单的 JMS 程序,它将 5 条消息放入一个队列,每次放入后,它都会输出 JMSMessageId。

示例输出:

2019/08/13 19:15:18.824 MQTestJMS11x5: testConn: successfully connected.
2019/08/13 19:15:18.845 MQTestJMS11x5: testConn: successfully opened TEST.Q1
2019/08/13 19:15:18.845 MQTestJMS11x5: sendMsg: Sending request to queue:///TEST.Q1
2019/08/13 19:15:18.845 MQTestJMS11x5: sendMsg: 
2019/08/13 19:15:18.887 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d24201102
2019/08/13 19:15:18.887 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d24201103
2019/08/13 19:15:18.888 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d24201104
2019/08/13 19:15:18.889 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d24201105
2019/08/13 19:15:18.889 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d24201106
2019/08/13 19:15:18.892 MQTestJMS11x5: testConn: Closed session
2019/08/13 19:15:18.892 MQTestJMS11x5: testConn: Stopped connection
2019/08/13 19:15:18.893 MQTestJMS11x5: testConn: Closed connection

请注意,每条消息 ID 都是唯一的。

这是生成输出的 JMS 程序:

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Hashtable;
import javax.jms.*;

import com.ibm.mq.jms.*;
import com.ibm.msg.client.wmq.WMQConstants;

/**
 * Program Name
 *  MQTestJMS11x5
 *
 * Description
 *  This java JMS class will connect to a remote queue manager and put 5 messages to a queue.
 *
 * Sample Command Line Parameters
 *  -m MQA1 -h 127.0.0.1 -p 1414 -c TEST.CHL -q TEST.Q1 -u UserID -x Password
 *
 * @author Roger Lacroix
 */
public class MQTestJMS11x5
{
   private static final SimpleDateFormat  LOGGER_TIMESTAMP = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");

   private Hashtable<String,String> params;
   private MQQueueConnectionFactory mqQCF = null;


   /**
    * The constructor
    */
   public MQTestJMS11x5()
   {
      super();
      params = new Hashtable<String,String>();
   }

   /**
    * Make sure the required parameters are present.
    * @return true/false
    */
   private boolean allParamsPresent()
   {
      boolean b = params.containsKey("-h") && params.containsKey("-p") &&
                  params.containsKey("-c") && params.containsKey("-m") &&
                  params.containsKey("-q") &&
                  params.containsKey("-u") && params.containsKey("-x");
      if (b)
      {
         try
         {
            Integer.parseInt((String) params.get("-p"));
         }
         catch (NumberFormatException e)
         {
            b = false;
         }
      }

      return b;
   }

   /**
    * Extract the command-line parameters and initialize the MQ variables.
    * @param args
    * @throws IllegalArgumentException
    */
   private void init(String[] args) throws IllegalArgumentException
   {
      if (args.length > 0 && (args.length % 2) == 0)
      {
         for (int i = 0; i < args.length; i += 2)
         {
            params.put(args[i], args[i + 1]);
         }
      }
      else
      {
         throw new IllegalArgumentException();
      }

      if (allParamsPresent())
      {
         try
         {
            mqQCF = new MQQueueConnectionFactory();
            mqQCF.setQueueManager((String) params.get("-m"));
            mqQCF.setHostName((String) params.get("-h"));
            mqQCF.setChannel((String) params.get("-c"));
            mqQCF.setTransportType(WMQConstants.WMQ_CM_CLIENT);
            try
            {
               mqQCF.setPort(Integer.parseInt((String) params.get("-p")));
            }
            catch (NumberFormatException e)
            {
               mqQCF.setPort(1414);
            }
         }
         catch (JMSException e)
         {
            MQTestJMS11x5.logger("getLinkedException()=" + e.getLinkedException());
            MQTestJMS11x5.logger(e.getLocalizedMessage());
            e.printStackTrace();
            throw new IllegalArgumentException();
         }
         catch (Exception e)
         {
            MQTestJMS11x5.logger(e.getLocalizedMessage());
            e.printStackTrace();
            throw new IllegalArgumentException();
         }
      }
      else
      {
         throw new IllegalArgumentException();
      }
   }

   /**
    * Test the connection to the queue manager.
    * @throws MQException
    */
   private void testConn()
   {
      QueueConnection conn = null;
      QueueSession session = null;
      Queue myQ = null;

      try
      {
         conn = mqQCF.createQueueConnection((String) params.get("-u"), (String) params.get("-x"));
         conn.start();

         session = conn.createQueueSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
         MQTestJMS11x5.logger("successfully connected.");

         myQ = session.createQueue((String) params.get("-q"));
         MQTestJMS11x5.logger("successfully opened "+ (String) params.get("-q"));

         MQDestination mqd = (MQDestination) myQ;
         mqd.setTargetClient(WMQConstants.WMQ_CLIENT_JMS_COMPLIANT);

         sendMsg( session, myQ);
      }
      catch (JMSException e)
      {
         MQTestJMS11x5.logger("getLinkedException()=" + e.getLinkedException());
         MQTestJMS11x5.logger(e.getLocalizedMessage());
         e.printStackTrace();
      }
      catch (Exception e)
      {
         MQTestJMS11x5.logger(e.getLocalizedMessage());
         e.printStackTrace();
      }
      finally
      {
         try
         {
            if (session != null)
            {
               session.close();
               MQTestJMS11x5.logger("Closed session");
            }
         }
         catch (Exception ex)
         {
            MQTestJMS11x5.logger("session.close() : " + ex.getLocalizedMessage());
         }

         try
         {
            if (conn != null)
            {
               conn.stop();
               MQTestJMS11x5.logger("Stopped connection");
            }
         }
         catch (Exception ex)
         {
            MQTestJMS11x5.logger("connection.stop() : " + ex.getLocalizedMessage());
         }

         try
         {
            if (conn != null)
            {
               conn.close();
               MQTestJMS11x5.logger("Closed connection");
            }
         }
         catch (Exception ex)
         {
            MQTestJMS11x5.logger("connection.close() : " + ex.getLocalizedMessage());
         }
      }
   }

   /**
    * Send a message to a queue.
    * @throws MQException
    */
   private void sendMsg(QueueSession session, Queue myQ) throws JMSException
   {
      QueueSender sender = null;
      TextMessage msg = null;

      try
      {
         MQTestJMS11x5.logger("Sending request to " + myQ.getQueueName());
         MQTestJMS11x5.logger("");

         sender = session.createSender(myQ);

         for (int i=0; i < 5; i++)
         {
            msg = session.createTextMessage();
            msg.setText("This is test message # " + (i+1));

            sender.send(msg);

            MQTestJMS11x5.logger("Sent message: MessageId="+msg.getJMSMessageID());
         }
      }
      finally
      {
         try
         {
            if (sender != null)
               sender.close();
         }
         catch (Exception ex)
         {
            MQTestJMS11x5.logger("sender.close() : " + ex.getLocalizedMessage());
         }
      }
   }

   /**
    * A simple logger method
    * @param data
    */
   public static void logger(String data)
   {
      String className = Thread.currentThread().getStackTrace()[2].getClassName();

      // Remove the package info.
      if ( (className != null) && (className.lastIndexOf('.') != -1) )
         className = className.substring(className.lastIndexOf('.')+1);

      System.out.println(LOGGER_TIMESTAMP.format(new Date())+" "+className+": "+Thread.currentThread().getStackTrace()[2].getMethodName()+": "+data);
   }

   /**
    * mainline
    * @param args
    */
   public static void main(String[] args)
   {
      MQTestJMS11x5 write = new MQTestJMS11x5();

      try
      {
         write.init(args);
         write.testConn();
      }
      catch (IllegalArgumentException e)
      {
         MQTestJMS11x5.logger("Usage: java MQTestJMS11x5 -m QueueManagerName -h host -p port -c channel -q JMS_Queue_Name -u UserID -x Password");
         System.exit(1);
      }
      catch (Exception e)
      {
         MQTestJMS11x5.logger(e.getLocalizedMessage());
         System.exit(1);
      }

      System.exit(0);
   }
}