JMS 回调 returns 重复的 JMSMessageID
JMS callback returns duplicate JMSMessageID
我目前正在尝试将一堆简单的消息发送到带有普通 java 的队列。
public AtomicReference<Message> doSend(String message, String queue){
try (JMSContext context = connectionFactory.createContext()) {
TextMessage textMessage = context.createTextMessage(message);
final AtomicReference<Message> msg = new AtomicReference<>();
msg.set(textMessage);
log.info("Sending message to queue {}", queue);
context.createProducer().send(createDestination(context, queue), textMessage);
log.info("Message sent to queue {}, messageId provided {}", queue, msg.get().getJMSMessageID());
return msg;
}
catch (Exception e) {
log.error("Failed to send message to queue",e);
throw new SipJmsException("Failed to send message to queue", e);
}
}
private Destination createDestination(JMSContext context, String queue){
log.debug("Creating destination queue {} connection",queue);
return context.createQueue(queue);
}
我连续发送了 N 条消息,日志显示生成的 JMSMessageId 始终相同。
[main] Sending message to queue TEST_QUEUE
[main] Message sent to queue TEST_QUEUE, messageId provided ID:414d5120444556494d53514d20202020551c3f5d81619824
[main] Sending message to queue TEST_QUEUE
[main] Message sent to queue TEST_QUEUE, messageId provided ID:414d5120444556494d53514d20202020551c3f5d83619824
等
据我所知,JMSMessageId 应该是唯一的,它的冲突会导致问题。
O'Reily 书指出:
JMSMessageID 是一个字符串值,用于唯一标识一条消息。标识符的唯一性取决于供应商。 JMSMessageID 对于需要对消息进行唯一索引的 JMS 消费者应用程序中的历史存储库很有用。
与 JMSCorrelationID 结合使用,JMSMessageID 也可用于关联消息:
String messageid = message.getJMSMessageID();
那么,为什么 MessageId 不是唯一的? (应用程序运行之间甚至相同)。
final AtomicReference<Message> msg = new AtomicReference<>();
为什么要使用 "final"。删除它并重试。
消息 ID 是唯一的,我用 *
:
标记了不同的编号
414d5120444556494d53514d20202020551c3f5d81619824
*
414d5120444556494d53514d20202020551c3f5d83619824
我创建了一个简单的 JMS 程序,它将 5 条消息放入一个队列,每次放入后,它都会输出 JMSMessageId。
示例输出:
2019/08/13 19:15:18.824 MQTestJMS11x5: testConn: successfully connected.
2019/08/13 19:15:18.845 MQTestJMS11x5: testConn: successfully opened TEST.Q1
2019/08/13 19:15:18.845 MQTestJMS11x5: sendMsg: Sending request to queue:///TEST.Q1
2019/08/13 19:15:18.845 MQTestJMS11x5: sendMsg:
2019/08/13 19:15:18.887 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d24201102
2019/08/13 19:15:18.887 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d24201103
2019/08/13 19:15:18.888 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d24201104
2019/08/13 19:15:18.889 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d24201105
2019/08/13 19:15:18.889 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d24201106
2019/08/13 19:15:18.892 MQTestJMS11x5: testConn: Closed session
2019/08/13 19:15:18.892 MQTestJMS11x5: testConn: Stopped connection
2019/08/13 19:15:18.893 MQTestJMS11x5: testConn: Closed connection
请注意,每条消息 ID 都是唯一的。
这是生成输出的 JMS 程序:
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Hashtable;
import javax.jms.*;
import com.ibm.mq.jms.*;
import com.ibm.msg.client.wmq.WMQConstants;
/**
* Program Name
* MQTestJMS11x5
*
* Description
* This java JMS class will connect to a remote queue manager and put 5 messages to a queue.
*
* Sample Command Line Parameters
* -m MQA1 -h 127.0.0.1 -p 1414 -c TEST.CHL -q TEST.Q1 -u UserID -x Password
*
* @author Roger Lacroix
*/
public class MQTestJMS11x5
{
private static final SimpleDateFormat LOGGER_TIMESTAMP = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
private Hashtable<String,String> params;
private MQQueueConnectionFactory mqQCF = null;
/**
* The constructor
*/
public MQTestJMS11x5()
{
super();
params = new Hashtable<String,String>();
}
/**
* Make sure the required parameters are present.
* @return true/false
*/
private boolean allParamsPresent()
{
boolean b = params.containsKey("-h") && params.containsKey("-p") &&
params.containsKey("-c") && params.containsKey("-m") &&
params.containsKey("-q") &&
params.containsKey("-u") && params.containsKey("-x");
if (b)
{
try
{
Integer.parseInt((String) params.get("-p"));
}
catch (NumberFormatException e)
{
b = false;
}
}
return b;
}
/**
* Extract the command-line parameters and initialize the MQ variables.
* @param args
* @throws IllegalArgumentException
*/
private void init(String[] args) throws IllegalArgumentException
{
if (args.length > 0 && (args.length % 2) == 0)
{
for (int i = 0; i < args.length; i += 2)
{
params.put(args[i], args[i + 1]);
}
}
else
{
throw new IllegalArgumentException();
}
if (allParamsPresent())
{
try
{
mqQCF = new MQQueueConnectionFactory();
mqQCF.setQueueManager((String) params.get("-m"));
mqQCF.setHostName((String) params.get("-h"));
mqQCF.setChannel((String) params.get("-c"));
mqQCF.setTransportType(WMQConstants.WMQ_CM_CLIENT);
try
{
mqQCF.setPort(Integer.parseInt((String) params.get("-p")));
}
catch (NumberFormatException e)
{
mqQCF.setPort(1414);
}
}
catch (JMSException e)
{
MQTestJMS11x5.logger("getLinkedException()=" + e.getLinkedException());
MQTestJMS11x5.logger(e.getLocalizedMessage());
e.printStackTrace();
throw new IllegalArgumentException();
}
catch (Exception e)
{
MQTestJMS11x5.logger(e.getLocalizedMessage());
e.printStackTrace();
throw new IllegalArgumentException();
}
}
else
{
throw new IllegalArgumentException();
}
}
/**
* Test the connection to the queue manager.
* @throws MQException
*/
private void testConn()
{
QueueConnection conn = null;
QueueSession session = null;
Queue myQ = null;
try
{
conn = mqQCF.createQueueConnection((String) params.get("-u"), (String) params.get("-x"));
conn.start();
session = conn.createQueueSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
MQTestJMS11x5.logger("successfully connected.");
myQ = session.createQueue((String) params.get("-q"));
MQTestJMS11x5.logger("successfully opened "+ (String) params.get("-q"));
MQDestination mqd = (MQDestination) myQ;
mqd.setTargetClient(WMQConstants.WMQ_CLIENT_JMS_COMPLIANT);
sendMsg( session, myQ);
}
catch (JMSException e)
{
MQTestJMS11x5.logger("getLinkedException()=" + e.getLinkedException());
MQTestJMS11x5.logger(e.getLocalizedMessage());
e.printStackTrace();
}
catch (Exception e)
{
MQTestJMS11x5.logger(e.getLocalizedMessage());
e.printStackTrace();
}
finally
{
try
{
if (session != null)
{
session.close();
MQTestJMS11x5.logger("Closed session");
}
}
catch (Exception ex)
{
MQTestJMS11x5.logger("session.close() : " + ex.getLocalizedMessage());
}
try
{
if (conn != null)
{
conn.stop();
MQTestJMS11x5.logger("Stopped connection");
}
}
catch (Exception ex)
{
MQTestJMS11x5.logger("connection.stop() : " + ex.getLocalizedMessage());
}
try
{
if (conn != null)
{
conn.close();
MQTestJMS11x5.logger("Closed connection");
}
}
catch (Exception ex)
{
MQTestJMS11x5.logger("connection.close() : " + ex.getLocalizedMessage());
}
}
}
/**
* Send a message to a queue.
* @throws MQException
*/
private void sendMsg(QueueSession session, Queue myQ) throws JMSException
{
QueueSender sender = null;
TextMessage msg = null;
try
{
MQTestJMS11x5.logger("Sending request to " + myQ.getQueueName());
MQTestJMS11x5.logger("");
sender = session.createSender(myQ);
for (int i=0; i < 5; i++)
{
msg = session.createTextMessage();
msg.setText("This is test message # " + (i+1));
sender.send(msg);
MQTestJMS11x5.logger("Sent message: MessageId="+msg.getJMSMessageID());
}
}
finally
{
try
{
if (sender != null)
sender.close();
}
catch (Exception ex)
{
MQTestJMS11x5.logger("sender.close() : " + ex.getLocalizedMessage());
}
}
}
/**
* A simple logger method
* @param data
*/
public static void logger(String data)
{
String className = Thread.currentThread().getStackTrace()[2].getClassName();
// Remove the package info.
if ( (className != null) && (className.lastIndexOf('.') != -1) )
className = className.substring(className.lastIndexOf('.')+1);
System.out.println(LOGGER_TIMESTAMP.format(new Date())+" "+className+": "+Thread.currentThread().getStackTrace()[2].getMethodName()+": "+data);
}
/**
* mainline
* @param args
*/
public static void main(String[] args)
{
MQTestJMS11x5 write = new MQTestJMS11x5();
try
{
write.init(args);
write.testConn();
}
catch (IllegalArgumentException e)
{
MQTestJMS11x5.logger("Usage: java MQTestJMS11x5 -m QueueManagerName -h host -p port -c channel -q JMS_Queue_Name -u UserID -x Password");
System.exit(1);
}
catch (Exception e)
{
MQTestJMS11x5.logger(e.getLocalizedMessage());
System.exit(1);
}
System.exit(0);
}
}
我目前正在尝试将一堆简单的消息发送到带有普通 java 的队列。
public AtomicReference<Message> doSend(String message, String queue){
try (JMSContext context = connectionFactory.createContext()) {
TextMessage textMessage = context.createTextMessage(message);
final AtomicReference<Message> msg = new AtomicReference<>();
msg.set(textMessage);
log.info("Sending message to queue {}", queue);
context.createProducer().send(createDestination(context, queue), textMessage);
log.info("Message sent to queue {}, messageId provided {}", queue, msg.get().getJMSMessageID());
return msg;
}
catch (Exception e) {
log.error("Failed to send message to queue",e);
throw new SipJmsException("Failed to send message to queue", e);
}
}
private Destination createDestination(JMSContext context, String queue){
log.debug("Creating destination queue {} connection",queue);
return context.createQueue(queue);
}
我连续发送了 N 条消息,日志显示生成的 JMSMessageId 始终相同。
[main] Sending message to queue TEST_QUEUE
[main] Message sent to queue TEST_QUEUE, messageId provided ID:414d5120444556494d53514d20202020551c3f5d81619824
[main] Sending message to queue TEST_QUEUE
[main] Message sent to queue TEST_QUEUE, messageId provided ID:414d5120444556494d53514d20202020551c3f5d83619824
等
据我所知,JMSMessageId 应该是唯一的,它的冲突会导致问题。
O'Reily 书指出:
JMSMessageID 是一个字符串值,用于唯一标识一条消息。标识符的唯一性取决于供应商。 JMSMessageID 对于需要对消息进行唯一索引的 JMS 消费者应用程序中的历史存储库很有用。 与 JMSCorrelationID 结合使用,JMSMessageID 也可用于关联消息: String messageid = message.getJMSMessageID();
那么,为什么 MessageId 不是唯一的? (应用程序运行之间甚至相同)。
final AtomicReference<Message> msg = new AtomicReference<>();
为什么要使用 "final"。删除它并重试。
消息 ID 是唯一的,我用 *
:
414d5120444556494d53514d20202020551c3f5d81619824
*
414d5120444556494d53514d20202020551c3f5d83619824
我创建了一个简单的 JMS 程序,它将 5 条消息放入一个队列,每次放入后,它都会输出 JMSMessageId。
示例输出:
2019/08/13 19:15:18.824 MQTestJMS11x5: testConn: successfully connected.
2019/08/13 19:15:18.845 MQTestJMS11x5: testConn: successfully opened TEST.Q1
2019/08/13 19:15:18.845 MQTestJMS11x5: sendMsg: Sending request to queue:///TEST.Q1
2019/08/13 19:15:18.845 MQTestJMS11x5: sendMsg:
2019/08/13 19:15:18.887 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d24201102
2019/08/13 19:15:18.887 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d24201103
2019/08/13 19:15:18.888 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d24201104
2019/08/13 19:15:18.889 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d24201105
2019/08/13 19:15:18.889 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d24201106
2019/08/13 19:15:18.892 MQTestJMS11x5: testConn: Closed session
2019/08/13 19:15:18.892 MQTestJMS11x5: testConn: Stopped connection
2019/08/13 19:15:18.893 MQTestJMS11x5: testConn: Closed connection
请注意,每条消息 ID 都是唯一的。
这是生成输出的 JMS 程序:
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Hashtable;
import javax.jms.*;
import com.ibm.mq.jms.*;
import com.ibm.msg.client.wmq.WMQConstants;
/**
* Program Name
* MQTestJMS11x5
*
* Description
* This java JMS class will connect to a remote queue manager and put 5 messages to a queue.
*
* Sample Command Line Parameters
* -m MQA1 -h 127.0.0.1 -p 1414 -c TEST.CHL -q TEST.Q1 -u UserID -x Password
*
* @author Roger Lacroix
*/
public class MQTestJMS11x5
{
private static final SimpleDateFormat LOGGER_TIMESTAMP = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
private Hashtable<String,String> params;
private MQQueueConnectionFactory mqQCF = null;
/**
* The constructor
*/
public MQTestJMS11x5()
{
super();
params = new Hashtable<String,String>();
}
/**
* Make sure the required parameters are present.
* @return true/false
*/
private boolean allParamsPresent()
{
boolean b = params.containsKey("-h") && params.containsKey("-p") &&
params.containsKey("-c") && params.containsKey("-m") &&
params.containsKey("-q") &&
params.containsKey("-u") && params.containsKey("-x");
if (b)
{
try
{
Integer.parseInt((String) params.get("-p"));
}
catch (NumberFormatException e)
{
b = false;
}
}
return b;
}
/**
* Extract the command-line parameters and initialize the MQ variables.
* @param args
* @throws IllegalArgumentException
*/
private void init(String[] args) throws IllegalArgumentException
{
if (args.length > 0 && (args.length % 2) == 0)
{
for (int i = 0; i < args.length; i += 2)
{
params.put(args[i], args[i + 1]);
}
}
else
{
throw new IllegalArgumentException();
}
if (allParamsPresent())
{
try
{
mqQCF = new MQQueueConnectionFactory();
mqQCF.setQueueManager((String) params.get("-m"));
mqQCF.setHostName((String) params.get("-h"));
mqQCF.setChannel((String) params.get("-c"));
mqQCF.setTransportType(WMQConstants.WMQ_CM_CLIENT);
try
{
mqQCF.setPort(Integer.parseInt((String) params.get("-p")));
}
catch (NumberFormatException e)
{
mqQCF.setPort(1414);
}
}
catch (JMSException e)
{
MQTestJMS11x5.logger("getLinkedException()=" + e.getLinkedException());
MQTestJMS11x5.logger(e.getLocalizedMessage());
e.printStackTrace();
throw new IllegalArgumentException();
}
catch (Exception e)
{
MQTestJMS11x5.logger(e.getLocalizedMessage());
e.printStackTrace();
throw new IllegalArgumentException();
}
}
else
{
throw new IllegalArgumentException();
}
}
/**
* Test the connection to the queue manager.
* @throws MQException
*/
private void testConn()
{
QueueConnection conn = null;
QueueSession session = null;
Queue myQ = null;
try
{
conn = mqQCF.createQueueConnection((String) params.get("-u"), (String) params.get("-x"));
conn.start();
session = conn.createQueueSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
MQTestJMS11x5.logger("successfully connected.");
myQ = session.createQueue((String) params.get("-q"));
MQTestJMS11x5.logger("successfully opened "+ (String) params.get("-q"));
MQDestination mqd = (MQDestination) myQ;
mqd.setTargetClient(WMQConstants.WMQ_CLIENT_JMS_COMPLIANT);
sendMsg( session, myQ);
}
catch (JMSException e)
{
MQTestJMS11x5.logger("getLinkedException()=" + e.getLinkedException());
MQTestJMS11x5.logger(e.getLocalizedMessage());
e.printStackTrace();
}
catch (Exception e)
{
MQTestJMS11x5.logger(e.getLocalizedMessage());
e.printStackTrace();
}
finally
{
try
{
if (session != null)
{
session.close();
MQTestJMS11x5.logger("Closed session");
}
}
catch (Exception ex)
{
MQTestJMS11x5.logger("session.close() : " + ex.getLocalizedMessage());
}
try
{
if (conn != null)
{
conn.stop();
MQTestJMS11x5.logger("Stopped connection");
}
}
catch (Exception ex)
{
MQTestJMS11x5.logger("connection.stop() : " + ex.getLocalizedMessage());
}
try
{
if (conn != null)
{
conn.close();
MQTestJMS11x5.logger("Closed connection");
}
}
catch (Exception ex)
{
MQTestJMS11x5.logger("connection.close() : " + ex.getLocalizedMessage());
}
}
}
/**
* Send a message to a queue.
* @throws MQException
*/
private void sendMsg(QueueSession session, Queue myQ) throws JMSException
{
QueueSender sender = null;
TextMessage msg = null;
try
{
MQTestJMS11x5.logger("Sending request to " + myQ.getQueueName());
MQTestJMS11x5.logger("");
sender = session.createSender(myQ);
for (int i=0; i < 5; i++)
{
msg = session.createTextMessage();
msg.setText("This is test message # " + (i+1));
sender.send(msg);
MQTestJMS11x5.logger("Sent message: MessageId="+msg.getJMSMessageID());
}
}
finally
{
try
{
if (sender != null)
sender.close();
}
catch (Exception ex)
{
MQTestJMS11x5.logger("sender.close() : " + ex.getLocalizedMessage());
}
}
}
/**
* A simple logger method
* @param data
*/
public static void logger(String data)
{
String className = Thread.currentThread().getStackTrace()[2].getClassName();
// Remove the package info.
if ( (className != null) && (className.lastIndexOf('.') != -1) )
className = className.substring(className.lastIndexOf('.')+1);
System.out.println(LOGGER_TIMESTAMP.format(new Date())+" "+className+": "+Thread.currentThread().getStackTrace()[2].getMethodName()+": "+data);
}
/**
* mainline
* @param args
*/
public static void main(String[] args)
{
MQTestJMS11x5 write = new MQTestJMS11x5();
try
{
write.init(args);
write.testConn();
}
catch (IllegalArgumentException e)
{
MQTestJMS11x5.logger("Usage: java MQTestJMS11x5 -m QueueManagerName -h host -p port -c channel -q JMS_Queue_Name -u UserID -x Password");
System.exit(1);
}
catch (Exception e)
{
MQTestJMS11x5.logger(e.getLocalizedMessage());
System.exit(1);
}
System.exit(0);
}
}