尝试使用队列外的 GET 消息(而非 JMS)设置 MQ
Trying to setup MQ with a GET message off the queue (not JMS)
我正在使用低级直接 IBM MQ 库将消息推送到队列并检索它们。我试图设置应用程序以便消息可以进入,比如通过提取数据库的数据然后将记录推送到队列中,实际上相同的代码可以读取消息。我主要是想设置一个线程,一旦消息出现就会拉取消息。
此代码运行,第一个 PUT 工作但第二个不工作并挂起。我是不是看不懂这里的流程
此外,如果我从下面第二个围绕“GET”获取代码,我是否可以编写一个线程,每 500 毫秒调用一次该例程,等待新消息进入。
final int putOptions = MQC.MQPMO_NO_SYNCPOINT
| MQC.MQPMO_SYNC_RESPONSE;
this.mqPMO = new MQPutMessageOptions();
this.mqPMO.options = putOptions;
// This code hangs !!!! (error here)
mqueue.put(msg, this.mqPMO);
...
public void bootstap() {
MQEnvironment.hostname = "localhost";
MQEnvironment.port = 1414;
MQEnvironment.channel = "DEV.ADMIN.SVRCONN";
MQEnvironment.properties.put(MQConstants.APPNAME_PROPERTY, "my_application_name");
MQEnvironment.enableTracing(5);
MQQueueManager mqManager = null;
MQQueue mqueue = null;
try {
// MQCNO_CLIENT_BINDING is not available for Java or .NET as they have their own mechanisms for choosing the bind type.
final String qmName = "QM1";
final String userId = "admin";
final String Password = "passw0rd";
final Hashtable h = new Hashtable();
h.put(MQConstants.USER_ID_PROPERTY, userId);
h.put(MQConstants.PASSWORD_PROPERTY, Password);
h.put(MQConstants.USE_MQCSP_AUTHENTICATION_PROPERTY, true);
mqManager = new MQQueueManager(qmName, h);
//mqManager = new MQQueueManager(qmName, WMQConstants.WMQ_CM_BINDINGS);
this.mqGMO = new MQGetMessageOptions();
this.mqGMO.options = MQC.MQGMO_NO_SYNCPOINT |
MQC.MQGMO_WAIT |
MQC.MQGMO_CONVERT |
MQC.MQGMO_FAIL_IF_QUIESCING;
this.mqGMO.matchOptions = MQC.MQMO_MATCH_CORREL_ID;
this.mqGMO.waitInterval = MQC.MQWI_UNLIMITED;
int openOptions = MQC.MQOO_INPUT_SHARED |
MQC.MQOO_OUTPUT;
mqueue = mqManager.accessQueue("DEV.QUEUE.1", openOptions);
logger.info(">> Find connection handle queue manager - " + mqueue);
{
final MQMessage msg = new MQMessage();
final String correlId = "0002";
final String byteArry = this.hexStringToByteArray(correlId);
logger.info(">>> correlId: " + correlId);
logger.info(">>> byteArry: " + byteArry);
msg.correlationId = byteArry.getBytes();
msg.format = MQConstants.MQFMT_STRING;
// ... and write some text in UTF8 format
msg.writeUTF("{{ Hello, World }}}");
// Use the default put message options...
// Or: pmo.options = MQConstants.MQPMO_ASYNC_RESPONSE
final int putOptions = MQC.MQPMO_NO_SYNCPOINT
| MQC.MQPMO_SYNC_RESPONSE;
this.mqPMO = new MQPutMessageOptions();
this.mqPMO.options = putOptions;
// put the message //
mqueue.put(msg, this.mqPMO);
logger.info(" >>> Continue to get routine");
}
{
// This code works !!! get the message
MQMessage retrievedMessage = new MQMessage();
retrievedMessage.correlationId = this.hexStringToByteArray("0001").getBytes();
mqueue.get(retrievedMessage, this.mqGMO);
// And prove we have the message by displaying the UTF message text
String msgText = retrievedMessage.readUTF();
logger.info("~~~~ The message is: " + msgText);
}
{
final MQMessage msg = new MQMessage();
final String correlId = "0001";
final String byteArry = this.hexStringToByteArray(correlId);
logger.info(">>> correlId: " + correlId);
logger.info(">>> byteArry: " + byteArry);
msg.correlationId = byteArry.getBytes();
msg.format = MQConstants.MQFMT_STRING;
// ... and write some text in UTF8 format
msg.writeUTF("{{ Hello, World }}}");
// Use the default put message options...
// Or: pmo.options = MQConstants.MQPMO_ASYNC_RESPONSE
final int putOptions = MQC.MQPMO_NO_SYNCPOINT
| MQC.MQPMO_SYNC_RESPONSE;
this.mqPMO = new MQPutMessageOptions();
this.mqPMO.options = putOptions;
// This code hangs !!!! (error here)
mqueue.put(msg, this.mqPMO);
}
mqueue.close();
mqManager.disconnect();
} catch(final Exception e) {
logger.error("Error at MQ manager", e);
}
}
首先,不要使用 MQEnvironment class,因为它不是线程安全的。您应该为 MQ 连接信息使用哈希表。
其次,所有最终声明是什么?很奇怪。
你的代码没有任何意义。这是我看到您的代码所做的:
设置MQ连接信息使用MQEnvironmentclass
在哈希表中设置用户 ID 和密码
连接到队列管理器
打开队列
将 'correlId' 设置为“0002”
最终字符串 byteArry = this.hexStringToByteArray(correlId);
这行代码没有任何意义。方法名称与您的代码不匹配。十六进制字符串的格式应为“0002”的“30303032”和 return 字节数组,即 byte[] 但它是 returning 字符串。所以,我不知道 hexStringToByteArray 方法在做什么。
此外,MQMD 结构的 MsgId、CorrelId 和 GroupId 字段的长度为 24 个字节。
- 以UTF格式写入消息数据。为什么?接收应用程序是否需要 UTF 格式?
- 将消息放入队列。
- 将接收消息的 CorrelId 设置为“0001”,但通过 hexStringToByteArray 方法转换为奇怪的表示形式。
- 从等待间隔为无限的队列中获取一条消息。由于队列中没有匹配该 CorrelId 的消息,因此 MQ 客户端库将永远等待!!!!
- 创建另一个 MQMessage 并将 CorrelId 设置为“0001”,但通过 hexStringToByteArray 方法转换为奇怪的表示形式。
- 以UTF格式写入消息数据。为什么?接收应用程序是否需要 UTF 格式?
- 将消息放入队列。
- 关闭队列
- 断开与队列管理器的连接
这是一个功能齐全的 Java/MQ 程序,它将 2 条消息放入具有唯一 CorrelId(即“0001”和“0002”)的队列中,然后检索 CorrelId 为“0002”的消息。
import java.io.IOException;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Hashtable;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.CMQC;
/**
* Program Name
* MQTest11B
*
* Description
* This java class will connect to a remote queue manager with the
* MQ setting stored in a HashTable, put 2 message on a queue with unique CorrelIds
* and then retrieve the message with a CorrelId of "0002".
*
* Sample Command Line Parameters
* -m MQA1 -h 127.0.0.1 -p 1414 -c TEST.CHL -q TEST.Q1 -u UserID -x Password
*
* @author Roger Lacroix
*/
public class MQTest11B
{
private static final SimpleDateFormat LOGGER_TIMESTAMP = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
private Hashtable<String,String> params;
private Hashtable<String,Object> mqht;
private String qMgrName;
private String outputQName;
/**
* The constructor
*/
public MQTest11B()
{
super();
params = new Hashtable<String,String>();
mqht = new Hashtable<String,Object>();
}
/**
* Make sure the required parameters are present.
* @return true/false
*/
private boolean allParamsPresent()
{
boolean b = params.containsKey("-h") && params.containsKey("-p") &&
params.containsKey("-c") && params.containsKey("-m") &&
params.containsKey("-q") &&
params.containsKey("-u") && params.containsKey("-x");
if (b)
{
try
{
Integer.parseInt((String) params.get("-p"));
}
catch (NumberFormatException e)
{
b = false;
}
}
return b;
}
/**
* Extract the command-line parameters and initialize the MQ HashTable.
* @param args
* @throws IllegalArgumentException
*/
private void init(String[] args) throws IllegalArgumentException
{
int port = 1414;
if (args.length > 0 && (args.length % 2) == 0)
{
for (int i = 0; i < args.length; i += 2)
{
params.put(args[i], args[i + 1]);
}
}
else
{
throw new IllegalArgumentException();
}
if (allParamsPresent())
{
qMgrName = (String) params.get("-m");
outputQName = (String) params.get("-q");
try
{
port = Integer.parseInt((String) params.get("-p"));
}
catch (NumberFormatException e)
{
port = 1414;
}
mqht.put(CMQC.CHANNEL_PROPERTY, params.get("-c"));
mqht.put(CMQC.HOST_NAME_PROPERTY, params.get("-h"));
mqht.put(CMQC.PORT_PROPERTY, new Integer(port));
mqht.put(CMQC.USER_ID_PROPERTY, params.get("-u"));
mqht.put(CMQC.PASSWORD_PROPERTY, params.get("-x"));
// I don't want to see MQ exceptions at the console.
MQException.log = null;
}
else
{
throw new IllegalArgumentException();
}
}
/**
* Connect, open queue, write a message, close queue and disconnect.
*
*/
private void testSendAndReceive()
{
MQQueueManager qMgr = null;
MQQueue queue = null;
int openOptions = CMQC.MQOO_INPUT_SHARED | CMQC.MQOO_OUTPUT | CMQC.MQOO_FAIL_IF_QUIESCING;
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options = CMQC.MQPMO_NO_SYNCPOINT | CMQC.MQPMO_FAIL_IF_QUIESCING;
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = CMQC.MQGMO_NO_SYNCPOINT | CMQC.MQGMO_WAIT | CMQC.MQGMO_CONVERT | CMQC.MQGMO_FAIL_IF_QUIESCING;
gmo.matchOptions = CMQC.MQMO_MATCH_CORREL_ID;
gmo.waitInterval = CMQC.MQWI_UNLIMITED;
MQMessage sendmsg;
String msgData;
DecimalFormat df = new DecimalFormat("0000");
try
{
qMgr = new MQQueueManager(qMgrName, mqht);
logger("successfully connected to "+ qMgrName);
queue = qMgr.accessQueue(outputQName, openOptions);
logger("successfully opened "+ outputQName);
/*
* Code to send 2 messages with a specific CorrelId. i.e. 0001 and 0002
*/
for (int i=0; i < 2; i++)
{
// Define a simple MQ message, and write some text
sendmsg = new MQMessage();
sendmsg.format = CMQC.MQFMT_STRING;
sendmsg.messageId = CMQC.MQMI_NONE;
sendmsg.correlationId = df.format(i+1).getBytes();
// Write message data
msgData = "This is a test message from MQTest11B. CorrelID is "+new String(sendmsg.correlationId);
sendmsg.writeString(msgData);
// put the message on the queue
queue.put(sendmsg, pmo);
logger("Sent: Message Data>>>" + msgData);
}
/*
* Code to receive a message with a specific CorrelId. i.e. 0002
*/
// Define a simple MQ message, and write some text
MQMessage receiveMsg = new MQMessage();
receiveMsg.messageId = CMQC.MQMI_NONE;
receiveMsg.correlationId = "0002".getBytes();
// get the message on the queue
queue.get(receiveMsg, gmo);
if (CMQC.MQFMT_STRING.equals(receiveMsg.format))
{
String msgStr = receiveMsg.readStringOfByteLength(receiveMsg.getMessageLength());
logger("Received: Message Data>>>" + msgStr);
}
else
{
byte[] b = new byte[receiveMsg.getMessageLength()];
receiveMsg.readFully(b);
logger("Received: Message Data>>>" + new String(b));
}
}
catch (MQException e)
{
logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
}
catch (IOException e)
{
logger("IOException:" +e.getLocalizedMessage());
}
finally
{
try
{
if (queue != null)
{
queue.close();
logger("closed: "+ outputQName);
}
}
catch (MQException e)
{
logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
}
try
{
if (qMgr != null)
{
qMgr.disconnect();
logger("disconnected from "+ qMgrName);
}
}
catch (MQException e)
{
logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
}
}
}
/**
* A simple logger method
* @param data
*/
public static void logger(String data)
{
String className = Thread.currentThread().getStackTrace()[2].getClassName();
// Remove the package info.
if ( (className != null) && (className.lastIndexOf('.') != -1) )
className = className.substring(className.lastIndexOf('.')+1);
System.out.println(LOGGER_TIMESTAMP.format(new Date())+" "+className+": "+Thread.currentThread().getStackTrace()[2].getMethodName()+": "+data);
}
/**
* main line
* @param args
*/
public static void main(String[] args)
{
MQTest11B write = new MQTest11B();
try
{
write.init(args);
write.testSendAndReceive();
}
catch (IllegalArgumentException e)
{
logger("Usage: java MQTest11B -m QueueManagerName -h host -p port -c channel -q QueueName -u UserID -x Password");
System.exit(1);
}
System.exit(0);
}
}
输出应该是这样的:
2021/07/02 14:01:59.316 MQTest11B: testSendAndReceive: successfully connected to MQA1
2021/07/02 14:01:59.332 MQTest11B: testSendAndReceive: successfully opened TEST.Q1
2021/07/02 14:01:59.332 MQTest11B: testSendAndReceive: Sent: Message Data>>>This is a test message from MQTest11B. CorrelID is 0001
2021/07/02 14:01:59.347 MQTest11B: testSendAndReceive: Sent: Message Data>>>This is a test message from MQTest11B. CorrelID is 0002
2021/07/02 14:01:59.347 MQTest11B: testSendAndReceive: Received: Message Data>>>This is a test message from MQTest11B. CorrelID is 0002
2021/07/02 14:01:59.347 MQTest11B: testSendAndReceive: closed: TEST.Q1
2021/07/02 14:01:59.347 MQTest11B: testSendAndReceive: disconnected from MQA1
我正在使用低级直接 IBM MQ 库将消息推送到队列并检索它们。我试图设置应用程序以便消息可以进入,比如通过提取数据库的数据然后将记录推送到队列中,实际上相同的代码可以读取消息。我主要是想设置一个线程,一旦消息出现就会拉取消息。
此代码运行,第一个 PUT 工作但第二个不工作并挂起。我是不是看不懂这里的流程
此外,如果我从下面第二个围绕“GET”获取代码,我是否可以编写一个线程,每 500 毫秒调用一次该例程,等待新消息进入。
final int putOptions = MQC.MQPMO_NO_SYNCPOINT
| MQC.MQPMO_SYNC_RESPONSE;
this.mqPMO = new MQPutMessageOptions();
this.mqPMO.options = putOptions;
// This code hangs !!!! (error here)
mqueue.put(msg, this.mqPMO);
...
public void bootstap() {
MQEnvironment.hostname = "localhost";
MQEnvironment.port = 1414;
MQEnvironment.channel = "DEV.ADMIN.SVRCONN";
MQEnvironment.properties.put(MQConstants.APPNAME_PROPERTY, "my_application_name");
MQEnvironment.enableTracing(5);
MQQueueManager mqManager = null;
MQQueue mqueue = null;
try {
// MQCNO_CLIENT_BINDING is not available for Java or .NET as they have their own mechanisms for choosing the bind type.
final String qmName = "QM1";
final String userId = "admin";
final String Password = "passw0rd";
final Hashtable h = new Hashtable();
h.put(MQConstants.USER_ID_PROPERTY, userId);
h.put(MQConstants.PASSWORD_PROPERTY, Password);
h.put(MQConstants.USE_MQCSP_AUTHENTICATION_PROPERTY, true);
mqManager = new MQQueueManager(qmName, h);
//mqManager = new MQQueueManager(qmName, WMQConstants.WMQ_CM_BINDINGS);
this.mqGMO = new MQGetMessageOptions();
this.mqGMO.options = MQC.MQGMO_NO_SYNCPOINT |
MQC.MQGMO_WAIT |
MQC.MQGMO_CONVERT |
MQC.MQGMO_FAIL_IF_QUIESCING;
this.mqGMO.matchOptions = MQC.MQMO_MATCH_CORREL_ID;
this.mqGMO.waitInterval = MQC.MQWI_UNLIMITED;
int openOptions = MQC.MQOO_INPUT_SHARED |
MQC.MQOO_OUTPUT;
mqueue = mqManager.accessQueue("DEV.QUEUE.1", openOptions);
logger.info(">> Find connection handle queue manager - " + mqueue);
{
final MQMessage msg = new MQMessage();
final String correlId = "0002";
final String byteArry = this.hexStringToByteArray(correlId);
logger.info(">>> correlId: " + correlId);
logger.info(">>> byteArry: " + byteArry);
msg.correlationId = byteArry.getBytes();
msg.format = MQConstants.MQFMT_STRING;
// ... and write some text in UTF8 format
msg.writeUTF("{{ Hello, World }}}");
// Use the default put message options...
// Or: pmo.options = MQConstants.MQPMO_ASYNC_RESPONSE
final int putOptions = MQC.MQPMO_NO_SYNCPOINT
| MQC.MQPMO_SYNC_RESPONSE;
this.mqPMO = new MQPutMessageOptions();
this.mqPMO.options = putOptions;
// put the message //
mqueue.put(msg, this.mqPMO);
logger.info(" >>> Continue to get routine");
}
{
// This code works !!! get the message
MQMessage retrievedMessage = new MQMessage();
retrievedMessage.correlationId = this.hexStringToByteArray("0001").getBytes();
mqueue.get(retrievedMessage, this.mqGMO);
// And prove we have the message by displaying the UTF message text
String msgText = retrievedMessage.readUTF();
logger.info("~~~~ The message is: " + msgText);
}
{
final MQMessage msg = new MQMessage();
final String correlId = "0001";
final String byteArry = this.hexStringToByteArray(correlId);
logger.info(">>> correlId: " + correlId);
logger.info(">>> byteArry: " + byteArry);
msg.correlationId = byteArry.getBytes();
msg.format = MQConstants.MQFMT_STRING;
// ... and write some text in UTF8 format
msg.writeUTF("{{ Hello, World }}}");
// Use the default put message options...
// Or: pmo.options = MQConstants.MQPMO_ASYNC_RESPONSE
final int putOptions = MQC.MQPMO_NO_SYNCPOINT
| MQC.MQPMO_SYNC_RESPONSE;
this.mqPMO = new MQPutMessageOptions();
this.mqPMO.options = putOptions;
// This code hangs !!!! (error here)
mqueue.put(msg, this.mqPMO);
}
mqueue.close();
mqManager.disconnect();
} catch(final Exception e) {
logger.error("Error at MQ manager", e);
}
}
首先,不要使用 MQEnvironment class,因为它不是线程安全的。您应该为 MQ 连接信息使用哈希表。
其次,所有最终声明是什么?很奇怪。
你的代码没有任何意义。这是我看到您的代码所做的:
设置MQ连接信息使用MQEnvironmentclass
在哈希表中设置用户 ID 和密码
连接到队列管理器
打开队列
将 'correlId' 设置为“0002”
最终字符串 byteArry = this.hexStringToByteArray(correlId);
这行代码没有任何意义。方法名称与您的代码不匹配。十六进制字符串的格式应为“0002”的“30303032”和 return 字节数组,即 byte[] 但它是 returning 字符串。所以,我不知道 hexStringToByteArray 方法在做什么。
此外,MQMD 结构的 MsgId、CorrelId 和 GroupId 字段的长度为 24 个字节。
- 以UTF格式写入消息数据。为什么?接收应用程序是否需要 UTF 格式?
- 将消息放入队列。
- 将接收消息的 CorrelId 设置为“0001”,但通过 hexStringToByteArray 方法转换为奇怪的表示形式。
- 从等待间隔为无限的队列中获取一条消息。由于队列中没有匹配该 CorrelId 的消息,因此 MQ 客户端库将永远等待!!!!
- 创建另一个 MQMessage 并将 CorrelId 设置为“0001”,但通过 hexStringToByteArray 方法转换为奇怪的表示形式。
- 以UTF格式写入消息数据。为什么?接收应用程序是否需要 UTF 格式?
- 将消息放入队列。
- 关闭队列
- 断开与队列管理器的连接
这是一个功能齐全的 Java/MQ 程序,它将 2 条消息放入具有唯一 CorrelId(即“0001”和“0002”)的队列中,然后检索 CorrelId 为“0002”的消息。
import java.io.IOException;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Hashtable;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.CMQC;
/**
* Program Name
* MQTest11B
*
* Description
* This java class will connect to a remote queue manager with the
* MQ setting stored in a HashTable, put 2 message on a queue with unique CorrelIds
* and then retrieve the message with a CorrelId of "0002".
*
* Sample Command Line Parameters
* -m MQA1 -h 127.0.0.1 -p 1414 -c TEST.CHL -q TEST.Q1 -u UserID -x Password
*
* @author Roger Lacroix
*/
public class MQTest11B
{
private static final SimpleDateFormat LOGGER_TIMESTAMP = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
private Hashtable<String,String> params;
private Hashtable<String,Object> mqht;
private String qMgrName;
private String outputQName;
/**
* The constructor
*/
public MQTest11B()
{
super();
params = new Hashtable<String,String>();
mqht = new Hashtable<String,Object>();
}
/**
* Make sure the required parameters are present.
* @return true/false
*/
private boolean allParamsPresent()
{
boolean b = params.containsKey("-h") && params.containsKey("-p") &&
params.containsKey("-c") && params.containsKey("-m") &&
params.containsKey("-q") &&
params.containsKey("-u") && params.containsKey("-x");
if (b)
{
try
{
Integer.parseInt((String) params.get("-p"));
}
catch (NumberFormatException e)
{
b = false;
}
}
return b;
}
/**
* Extract the command-line parameters and initialize the MQ HashTable.
* @param args
* @throws IllegalArgumentException
*/
private void init(String[] args) throws IllegalArgumentException
{
int port = 1414;
if (args.length > 0 && (args.length % 2) == 0)
{
for (int i = 0; i < args.length; i += 2)
{
params.put(args[i], args[i + 1]);
}
}
else
{
throw new IllegalArgumentException();
}
if (allParamsPresent())
{
qMgrName = (String) params.get("-m");
outputQName = (String) params.get("-q");
try
{
port = Integer.parseInt((String) params.get("-p"));
}
catch (NumberFormatException e)
{
port = 1414;
}
mqht.put(CMQC.CHANNEL_PROPERTY, params.get("-c"));
mqht.put(CMQC.HOST_NAME_PROPERTY, params.get("-h"));
mqht.put(CMQC.PORT_PROPERTY, new Integer(port));
mqht.put(CMQC.USER_ID_PROPERTY, params.get("-u"));
mqht.put(CMQC.PASSWORD_PROPERTY, params.get("-x"));
// I don't want to see MQ exceptions at the console.
MQException.log = null;
}
else
{
throw new IllegalArgumentException();
}
}
/**
* Connect, open queue, write a message, close queue and disconnect.
*
*/
private void testSendAndReceive()
{
MQQueueManager qMgr = null;
MQQueue queue = null;
int openOptions = CMQC.MQOO_INPUT_SHARED | CMQC.MQOO_OUTPUT | CMQC.MQOO_FAIL_IF_QUIESCING;
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options = CMQC.MQPMO_NO_SYNCPOINT | CMQC.MQPMO_FAIL_IF_QUIESCING;
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = CMQC.MQGMO_NO_SYNCPOINT | CMQC.MQGMO_WAIT | CMQC.MQGMO_CONVERT | CMQC.MQGMO_FAIL_IF_QUIESCING;
gmo.matchOptions = CMQC.MQMO_MATCH_CORREL_ID;
gmo.waitInterval = CMQC.MQWI_UNLIMITED;
MQMessage sendmsg;
String msgData;
DecimalFormat df = new DecimalFormat("0000");
try
{
qMgr = new MQQueueManager(qMgrName, mqht);
logger("successfully connected to "+ qMgrName);
queue = qMgr.accessQueue(outputQName, openOptions);
logger("successfully opened "+ outputQName);
/*
* Code to send 2 messages with a specific CorrelId. i.e. 0001 and 0002
*/
for (int i=0; i < 2; i++)
{
// Define a simple MQ message, and write some text
sendmsg = new MQMessage();
sendmsg.format = CMQC.MQFMT_STRING;
sendmsg.messageId = CMQC.MQMI_NONE;
sendmsg.correlationId = df.format(i+1).getBytes();
// Write message data
msgData = "This is a test message from MQTest11B. CorrelID is "+new String(sendmsg.correlationId);
sendmsg.writeString(msgData);
// put the message on the queue
queue.put(sendmsg, pmo);
logger("Sent: Message Data>>>" + msgData);
}
/*
* Code to receive a message with a specific CorrelId. i.e. 0002
*/
// Define a simple MQ message, and write some text
MQMessage receiveMsg = new MQMessage();
receiveMsg.messageId = CMQC.MQMI_NONE;
receiveMsg.correlationId = "0002".getBytes();
// get the message on the queue
queue.get(receiveMsg, gmo);
if (CMQC.MQFMT_STRING.equals(receiveMsg.format))
{
String msgStr = receiveMsg.readStringOfByteLength(receiveMsg.getMessageLength());
logger("Received: Message Data>>>" + msgStr);
}
else
{
byte[] b = new byte[receiveMsg.getMessageLength()];
receiveMsg.readFully(b);
logger("Received: Message Data>>>" + new String(b));
}
}
catch (MQException e)
{
logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
}
catch (IOException e)
{
logger("IOException:" +e.getLocalizedMessage());
}
finally
{
try
{
if (queue != null)
{
queue.close();
logger("closed: "+ outputQName);
}
}
catch (MQException e)
{
logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
}
try
{
if (qMgr != null)
{
qMgr.disconnect();
logger("disconnected from "+ qMgrName);
}
}
catch (MQException e)
{
logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
}
}
}
/**
* A simple logger method
* @param data
*/
public static void logger(String data)
{
String className = Thread.currentThread().getStackTrace()[2].getClassName();
// Remove the package info.
if ( (className != null) && (className.lastIndexOf('.') != -1) )
className = className.substring(className.lastIndexOf('.')+1);
System.out.println(LOGGER_TIMESTAMP.format(new Date())+" "+className+": "+Thread.currentThread().getStackTrace()[2].getMethodName()+": "+data);
}
/**
* main line
* @param args
*/
public static void main(String[] args)
{
MQTest11B write = new MQTest11B();
try
{
write.init(args);
write.testSendAndReceive();
}
catch (IllegalArgumentException e)
{
logger("Usage: java MQTest11B -m QueueManagerName -h host -p port -c channel -q QueueName -u UserID -x Password");
System.exit(1);
}
System.exit(0);
}
}
输出应该是这样的:
2021/07/02 14:01:59.316 MQTest11B: testSendAndReceive: successfully connected to MQA1
2021/07/02 14:01:59.332 MQTest11B: testSendAndReceive: successfully opened TEST.Q1
2021/07/02 14:01:59.332 MQTest11B: testSendAndReceive: Sent: Message Data>>>This is a test message from MQTest11B. CorrelID is 0001
2021/07/02 14:01:59.347 MQTest11B: testSendAndReceive: Sent: Message Data>>>This is a test message from MQTest11B. CorrelID is 0002
2021/07/02 14:01:59.347 MQTest11B: testSendAndReceive: Received: Message Data>>>This is a test message from MQTest11B. CorrelID is 0002
2021/07/02 14:01:59.347 MQTest11B: testSendAndReceive: closed: TEST.Q1
2021/07/02 14:01:59.347 MQTest11B: testSendAndReceive: disconnected from MQA1