使用来自 IBM MQ 的所有消息
consume all messages from IBM MQ
我想消费来自 MQ 的所有消息。
public static void main(String[] args)
{
JMSContext context = null;
Destination destination = null;
JMSConsumer consumer = null;
JmsFactoryFactory FF = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
JmsConnectionFactor CF = FF.createConnectionFactory();
context = CF.createContext();
destination = context.createQueue(QUEUE_NAME);
consumer = context.createConsumer(destination);
String msg = consumer.receiveBody(String.class, 15090);
System.out.println(msg);
}
只能阅读一条消息。我怎样才能消费所有消息?另外,有没有更简单的方法来删除队列中的所有消息,甚至不读取或使用它们?
JMS API 一次使用一条消息,因此您需要将 receiveBody
放入循环中,例如:
public static void main(String[] args) {
JMSContext context = null;
Destination destination = null;
JMSConsumer consumer = null;
JmsFactoryFactory FF = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
JmsConnectionFactor CF = FF.createConnectionFactory();
context = CF.createContext();
destination = context.createQueue(QUEUE_NAME);
consumer = context.createConsumer(destination);
String msg = null;
do {
msg = consumer.receiveBody(String.class, 15090);
System.out.println(msg);
} while (msg != null);
}
当 receiveBody
returns null
表示队列中没有更多消息。
JMS API 没有定义从队列中删除所有消息的任何方法,但是大多数 JMS 服务器都有一个特定于实现的管理 API,您可以通过它执行这些类型的操作动作。
如果正如您的问题所暗示的那样,您只想清空所有消息的队列而不是在应用程序中实际读取它们,您可以考虑简单地使用管理 MQSC 命令:-
CLEAR QLOCAL(queue-name)
您可以将其输入 runmqsc
工具以将其发送给队列管理器。
is there any simpler way to delete all messages in queue without even
reading or consuming them?
是的。您可以使用 MQ PCF Clear Queue 命令,只要没有应用程序为输入或输出打开队列。即 IPPROCS 和 OPPROCS 必须为零才能工作。 Morag 的 MQSC Clear 命令也是如此。
这是一个功能齐全的 Java MQ PCF 程序,用于清除队列。
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Hashtable;
import com.ibm.mq.MQException;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.CMQC;
import com.ibm.mq.constants.CMQCFC;
import com.ibm.mq.headers.MQDataException;
import com.ibm.mq.headers.pcf.PCFMessage;
import com.ibm.mq.headers.pcf.PCFMessageAgent;
/**
* Program Name
* MQClearQueue01
*
* Description
* This java class issues a PCF "Clear Q" command for a queue to delete all messages
* in the queue of a remote queue manager.
*
* Sample Command Line Parameters
* -m MQA1 -h 127.0.0.1 -p 1414 -c TEST.CHL -q TEST.Q1 -u UserID -x Password
*
* @author Roger Lacroix
*/
public class MQClearQueue01
{
private static final SimpleDateFormat LOGGER_TIMESTAMP = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
private Hashtable<String,String> params;
private Hashtable<String,Object> mqht;
public MQClearQueue01()
{
super();
params = new Hashtable<String,String>();
mqht = new Hashtable<String,Object>();
}
/**
* Make sure the required parameters are present.
* @return true/false
*/
private boolean allParamsPresent()
{
boolean b = params.containsKey("-h") && params.containsKey("-p") &&
params.containsKey("-c") && params.containsKey("-m") &&
params.containsKey("-q") &&
params.containsKey("-u") && params.containsKey("-x");
if (b)
{
try
{
Integer.parseInt((String) params.get("-p"));
}
catch (NumberFormatException e)
{
b = false;
}
}
return b;
}
/**
* Extract the command-line parameters and initialize the MQ HashTable.
* @param args
* @throws IllegalArgumentException
*/
private void init(String[] args) throws IllegalArgumentException
{
int port = 1414;
if (args.length > 0 && (args.length % 2) == 0)
{
for (int i = 0; i < args.length; i += 2)
{
params.put(args[i], args[i + 1]);
}
}
else
{
throw new IllegalArgumentException();
}
if (allParamsPresent())
{
try
{
port = Integer.parseInt((String) params.get("-p"));
}
catch (NumberFormatException e)
{
port = 1414;
}
mqht.put(CMQC.CHANNEL_PROPERTY, params.get("-c"));
mqht.put(CMQC.HOST_NAME_PROPERTY, params.get("-h"));
mqht.put(CMQC.PORT_PROPERTY, new Integer(port));
mqht.put(CMQC.USER_ID_PROPERTY, params.get("-u"));
mqht.put(CMQC.PASSWORD_PROPERTY, params.get("-x"));
// I don't want to see MQ exceptions at the console.
MQException.log = null;
}
else
{
throw new IllegalArgumentException();
}
}
/**
* Handle connecting to the queue manager, issuing PCF command then
* looping through PCF response messages and disconnecting from
* the queue manager.
*/
private void doPCF()
{
MQQueueManager qMgr = null;
PCFMessageAgent agent = null;
PCFMessage request = null;
PCFMessage[] responses = null;
String qMgrName = (String) params.get("-m");
String queueName = (String) params.get("-q");
try
{
qMgr = new MQQueueManager(qMgrName, mqht);
MQClearQueue01.logger("successfully connected to "+ qMgrName);
agent = new PCFMessageAgent(qMgr);
MQClearQueue01.logger("successfully created agent");
// https://www.ibm.com/support/knowledgecenter/SSFKSJ_latest/com.ibm.mq.ref.adm.doc/q087420_.html
request = new PCFMessage(CMQCFC.MQCMD_CLEAR_Q);
request.addParameter(CMQC.MQCA_Q_NAME, queueName);
responses = agent.send(request);
MQClearQueue01.logger("responses.length="+responses.length);
for (int i = 0; i < responses.length; i++)
{
if ((responses[i]).getCompCode() == CMQC.MQCC_OK)
MQClearQueue01.logger("Successfully cleared queue '"+queueName+"' of messages.");
else
MQClearQueue01.logger("Error: Failed to clear queue '"+queueName+"' of messages.");
}
}
catch (MQException e)
{
MQClearQueue01.logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
}
catch (IOException e)
{
MQClearQueue01.logger("IOException:" +e.getLocalizedMessage());
}
catch (MQDataException e)
{
MQClearQueue01.logger("MQDataException:" +e.getLocalizedMessage());
}
finally
{
try
{
if (agent != null)
{
agent.disconnect();
MQClearQueue01.logger("disconnected from agent");
}
}
catch (MQDataException e)
{
MQClearQueue01.logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
}
try
{
if (qMgr != null)
{
qMgr.disconnect();
MQClearQueue01.logger("disconnected from "+ qMgrName);
}
}
catch (MQException e)
{
MQClearQueue01.logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
}
}
}
/**
* A simple logger method
* @param data
*/
public static void logger(String data)
{
String className = Thread.currentThread().getStackTrace()[2].getClassName();
// Remove the package info.
if ( (className != null) && (className.lastIndexOf('.') != -1) )
className = className.substring(className.lastIndexOf('.')+1);
System.out.println(LOGGER_TIMESTAMP.format(new Date())+" "+className+": "+Thread.currentThread().getStackTrace()[2].getMethodName()+": "+data);
}
public static void main(String[] args)
{
MQClearQueue01 mqcq = new MQClearQueue01();
try
{
mqcq.init(args);
mqcq.doPCF();
}
catch (IllegalArgumentException e)
{
MQClearQueue01.logger("Usage: java MQClearQueue01 -m QueueManagerName -h host -p port -c channel -q QueueName -u UserID -x Password");
System.exit(1);
}
System.exit(0);
}
}
我想消费来自 MQ 的所有消息。
public static void main(String[] args)
{
JMSContext context = null;
Destination destination = null;
JMSConsumer consumer = null;
JmsFactoryFactory FF = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
JmsConnectionFactor CF = FF.createConnectionFactory();
context = CF.createContext();
destination = context.createQueue(QUEUE_NAME);
consumer = context.createConsumer(destination);
String msg = consumer.receiveBody(String.class, 15090);
System.out.println(msg);
}
只能阅读一条消息。我怎样才能消费所有消息?另外,有没有更简单的方法来删除队列中的所有消息,甚至不读取或使用它们?
JMS API 一次使用一条消息,因此您需要将 receiveBody
放入循环中,例如:
public static void main(String[] args) {
JMSContext context = null;
Destination destination = null;
JMSConsumer consumer = null;
JmsFactoryFactory FF = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
JmsConnectionFactor CF = FF.createConnectionFactory();
context = CF.createContext();
destination = context.createQueue(QUEUE_NAME);
consumer = context.createConsumer(destination);
String msg = null;
do {
msg = consumer.receiveBody(String.class, 15090);
System.out.println(msg);
} while (msg != null);
}
当 receiveBody
returns null
表示队列中没有更多消息。
JMS API 没有定义从队列中删除所有消息的任何方法,但是大多数 JMS 服务器都有一个特定于实现的管理 API,您可以通过它执行这些类型的操作动作。
如果正如您的问题所暗示的那样,您只想清空所有消息的队列而不是在应用程序中实际读取它们,您可以考虑简单地使用管理 MQSC 命令:-
CLEAR QLOCAL(queue-name)
您可以将其输入 runmqsc
工具以将其发送给队列管理器。
is there any simpler way to delete all messages in queue without even reading or consuming them?
是的。您可以使用 MQ PCF Clear Queue 命令,只要没有应用程序为输入或输出打开队列。即 IPPROCS 和 OPPROCS 必须为零才能工作。 Morag 的 MQSC Clear 命令也是如此。
这是一个功能齐全的 Java MQ PCF 程序,用于清除队列。
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Hashtable;
import com.ibm.mq.MQException;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.CMQC;
import com.ibm.mq.constants.CMQCFC;
import com.ibm.mq.headers.MQDataException;
import com.ibm.mq.headers.pcf.PCFMessage;
import com.ibm.mq.headers.pcf.PCFMessageAgent;
/**
* Program Name
* MQClearQueue01
*
* Description
* This java class issues a PCF "Clear Q" command for a queue to delete all messages
* in the queue of a remote queue manager.
*
* Sample Command Line Parameters
* -m MQA1 -h 127.0.0.1 -p 1414 -c TEST.CHL -q TEST.Q1 -u UserID -x Password
*
* @author Roger Lacroix
*/
public class MQClearQueue01
{
private static final SimpleDateFormat LOGGER_TIMESTAMP = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
private Hashtable<String,String> params;
private Hashtable<String,Object> mqht;
public MQClearQueue01()
{
super();
params = new Hashtable<String,String>();
mqht = new Hashtable<String,Object>();
}
/**
* Make sure the required parameters are present.
* @return true/false
*/
private boolean allParamsPresent()
{
boolean b = params.containsKey("-h") && params.containsKey("-p") &&
params.containsKey("-c") && params.containsKey("-m") &&
params.containsKey("-q") &&
params.containsKey("-u") && params.containsKey("-x");
if (b)
{
try
{
Integer.parseInt((String) params.get("-p"));
}
catch (NumberFormatException e)
{
b = false;
}
}
return b;
}
/**
* Extract the command-line parameters and initialize the MQ HashTable.
* @param args
* @throws IllegalArgumentException
*/
private void init(String[] args) throws IllegalArgumentException
{
int port = 1414;
if (args.length > 0 && (args.length % 2) == 0)
{
for (int i = 0; i < args.length; i += 2)
{
params.put(args[i], args[i + 1]);
}
}
else
{
throw new IllegalArgumentException();
}
if (allParamsPresent())
{
try
{
port = Integer.parseInt((String) params.get("-p"));
}
catch (NumberFormatException e)
{
port = 1414;
}
mqht.put(CMQC.CHANNEL_PROPERTY, params.get("-c"));
mqht.put(CMQC.HOST_NAME_PROPERTY, params.get("-h"));
mqht.put(CMQC.PORT_PROPERTY, new Integer(port));
mqht.put(CMQC.USER_ID_PROPERTY, params.get("-u"));
mqht.put(CMQC.PASSWORD_PROPERTY, params.get("-x"));
// I don't want to see MQ exceptions at the console.
MQException.log = null;
}
else
{
throw new IllegalArgumentException();
}
}
/**
* Handle connecting to the queue manager, issuing PCF command then
* looping through PCF response messages and disconnecting from
* the queue manager.
*/
private void doPCF()
{
MQQueueManager qMgr = null;
PCFMessageAgent agent = null;
PCFMessage request = null;
PCFMessage[] responses = null;
String qMgrName = (String) params.get("-m");
String queueName = (String) params.get("-q");
try
{
qMgr = new MQQueueManager(qMgrName, mqht);
MQClearQueue01.logger("successfully connected to "+ qMgrName);
agent = new PCFMessageAgent(qMgr);
MQClearQueue01.logger("successfully created agent");
// https://www.ibm.com/support/knowledgecenter/SSFKSJ_latest/com.ibm.mq.ref.adm.doc/q087420_.html
request = new PCFMessage(CMQCFC.MQCMD_CLEAR_Q);
request.addParameter(CMQC.MQCA_Q_NAME, queueName);
responses = agent.send(request);
MQClearQueue01.logger("responses.length="+responses.length);
for (int i = 0; i < responses.length; i++)
{
if ((responses[i]).getCompCode() == CMQC.MQCC_OK)
MQClearQueue01.logger("Successfully cleared queue '"+queueName+"' of messages.");
else
MQClearQueue01.logger("Error: Failed to clear queue '"+queueName+"' of messages.");
}
}
catch (MQException e)
{
MQClearQueue01.logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
}
catch (IOException e)
{
MQClearQueue01.logger("IOException:" +e.getLocalizedMessage());
}
catch (MQDataException e)
{
MQClearQueue01.logger("MQDataException:" +e.getLocalizedMessage());
}
finally
{
try
{
if (agent != null)
{
agent.disconnect();
MQClearQueue01.logger("disconnected from agent");
}
}
catch (MQDataException e)
{
MQClearQueue01.logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
}
try
{
if (qMgr != null)
{
qMgr.disconnect();
MQClearQueue01.logger("disconnected from "+ qMgrName);
}
}
catch (MQException e)
{
MQClearQueue01.logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
}
}
}
/**
* A simple logger method
* @param data
*/
public static void logger(String data)
{
String className = Thread.currentThread().getStackTrace()[2].getClassName();
// Remove the package info.
if ( (className != null) && (className.lastIndexOf('.') != -1) )
className = className.substring(className.lastIndexOf('.')+1);
System.out.println(LOGGER_TIMESTAMP.format(new Date())+" "+className+": "+Thread.currentThread().getStackTrace()[2].getMethodName()+": "+data);
}
public static void main(String[] args)
{
MQClearQueue01 mqcq = new MQClearQueue01();
try
{
mqcq.init(args);
mqcq.doPCF();
}
catch (IllegalArgumentException e)
{
MQClearQueue01.logger("Usage: java MQClearQueue01 -m QueueManagerName -h host -p port -c channel -q QueueName -u UserID -x Password");
System.exit(1);
}
System.exit(0);
}
}