C#:使用 Read() 方法时的 IBM MQ 'unlock' 消息
C#: IBM MQ 'unlock' message when using Read() method
我的代码:
//Initialize MQMessage
MQMessage message = new MQMessage();
//Initialize WebMQConnection
WebSphereMQConnection mqRequestConnection = new WebSphereMQConnection(initQMName, initQChannel, initQConnection, initQName, string.Empty, string.Empty);
mqRequestConnection.Open();
mqRequestConnection.Read(message);
//Get the contents as a string
string body = message.ReadString(message.MessageLength);
return body;
此代码是控制台应用程序的一部分,按预期顺序浏览队列并读取每条消息。它解析平面文件中的字符串内容。
不过,Read() 方法似乎也会锁定消息,直到程序关闭。即使我 运行 循环中的程序顺序读取所有消息,它似乎也不会 'release' 消息,直到程序完全关闭。
我尝试了 Gets 和 Puts、Dispose、Backout 等,除了停止整个控制台应用程序的执行之外似乎没有任何效果。
//Initialize WebMQConnection
WebSphereMQConnection mqRequestConnection = new WebSphereMQConnection(initQMName, initQChannel, initQConnection, initQName, string.Empty, string.Empty);
mqRequestConnection.Open();
mqRequestConnection.Read(message);
这到底是什么东西?它肯定不是 IBM MQ class 和方法。所以,它是土生土长的 class.
所以,Read()方法实际上执行了一个'browse'。命名方法不好。我敢打赌,在您的 Read() 方法中,它使用了 MQGET 上的 MQGMO_LOCK 选项。
您为什么不摆脱 WebSphereMQConnection class 并只编写纯 IBM MQ .NET 代码?
这是一个 MQ CS .NET 示例程序,用于在没有任何消息锁定的情况下浏览队列:
using System;
using IBM.WMQ;
/// <summary> Program Name
/// MQTest62B
///
/// Description
/// This C# class will connect to a remote queue manager
/// and get (browse) a message from a queue using a managed .NET environment.
///
/// Sample Command Line Parameters
/// -h 127.0.0.1 -p 1415 -c TEST.CHL -m MQWT1 -q TEST.Q1
/// </summary>
/// <author> Roger Lacroix, Capitalware Inc.
/// </author>
namespace MQTest62
{
public class MQTest62B
{
private System.Collections.Hashtable inParms = null;
private System.String qManager;
private System.String outputQName;
private System.String userID = "tester";
private System.String password = "barney";
/*
* The constructor
*/
public MQTest62B()
: base()
{
}
/// <summary> Make sure the required parameters are present.</summary>
/// <returns> true/false
/// </returns>
private bool allParamsPresent()
{
bool b = inParms.ContainsKey("-h") && inParms.ContainsKey("-p") &&
inParms.ContainsKey("-c") && inParms.ContainsKey("-m") &&
inParms.ContainsKey("-q");
if (b)
{
try
{
System.Int32.Parse((System.String)inParms["-p"]);
}
catch (System.FormatException e)
{
b = false;
}
}
return b;
}
/// <summary> Extract the command-line parameters and initialize the MQ variables.</summary>
/// <param name="args">
/// </param>
/// <throws> IllegalArgumentException </throws>
private void init(System.String[] args)
{
inParms = System.Collections.Hashtable.Synchronized(new System.Collections.Hashtable(14));
if (args.Length > 0 && (args.Length % 2) == 0)
{
for (int i = 0; i < args.Length; i += 2)
{
inParms[args[i]] = args[i + 1];
}
}
else
{
throw new System.ArgumentException();
}
if (allParamsPresent())
{
qManager = ((System.String)inParms["-m"]);
outputQName = ((System.String)inParms["-q"]);
// Set up MQ environment
MQEnvironment.Hostname = ((System.String)inParms["-h"]);
MQEnvironment.Channel = ((System.String)inParms["-c"]);
try
{
MQEnvironment.Port = System.Int32.Parse((System.String)inParms["-p"]);
}
catch (System.FormatException e)
{
MQEnvironment.Port = 1414;
}
if (userID != null)
MQEnvironment.UserId = userID;
if (password != null)
MQEnvironment.Password = password;
}
else
{
throw new System.ArgumentException();
}
}
/// <summary> Connect, open queue, read (browse) a message, close queue and disconnect. </summary>
///
private void testReceive()
{
MQQueueManager qMgr = null;
MQQueue inQ = null;
int openOptions = MQC.MQOO_BROWSE + MQC.MQOO_FAIL_IF_QUIESCING;
try
{
qMgr = new MQQueueManager(qManager);
System.Console.Out.WriteLine("MQTest62B successfully connected to " + qManager);
inQ = qMgr.AccessQueue(outputQName, openOptions, null, null, null); // no alternate user id
System.Console.Out.WriteLine("MQTest62B successfully opened " + outputQName);
testLoop(inQ);
}
catch (MQException mqex)
{
System.Console.Out.WriteLine("MQTest62B cc=" + mqex.CompletionCode + " : rc=" + mqex.ReasonCode);
}
catch (System.IO.IOException ioex)
{
System.Console.Out.WriteLine("MQTest62B ioex=" + ioex);
}
finally
{
try
{
if (inQ != null)
inQ.Close();
System.Console.Out.WriteLine("MQTest62B closed: " + outputQName);
}
catch (MQException mqex)
{
System.Console.Out.WriteLine("MQTest62B cc=" + mqex.CompletionCode + " : rc=" + mqex.ReasonCode);
}
try
{
if (qMgr != null)
qMgr.Disconnect();
System.Console.Out.WriteLine("MQTest62B disconnected from " + qManager);
}
catch (MQException mqex)
{
System.Console.Out.WriteLine("MQTest62B cc=" + mqex.CompletionCode + " : rc=" + mqex.ReasonCode);
}
}
}
private void testLoop(MQQueue inQ)
{
bool flag = true;
MQMessage msg = new MQMessage();
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.Options |= MQC.MQGMO_BROWSE_NEXT | MQC.MQGMO_WAIT | MQC.MQGMO_FAIL_IF_QUIESCING;
gmo.WaitInterval = 500; // 1/2 second wait time or MQC.MQEI_UNLIMITED
while (flag)
{
try
{
msg = new MQMessage();
inQ.Get(msg, gmo);
System.Console.Out.WriteLine("Message Data: " + msg.ReadString(msg.MessageLength));
}
catch (MQException mqex)
{
System.Console.Out.WriteLine("MQTest62B CC=" + mqex.CompletionCode + " : RC=" + mqex.ReasonCode);
if (mqex.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
{
// no meesage - life is good - loop again
}
else
{
flag = false; // severe error - time to exit
}
}
catch (System.IO.IOException ioex)
{
System.Console.Out.WriteLine("MQTest62B ioex=" + ioex);
}
}
}
/// <summary> main line</summary>
/// <param name="args">
/// </param>
// [STAThread]
public static void Main(System.String[] args)
{
MQTest62B write = new MQTest62B();
try
{
write.init(args);
write.testReceive();
}
catch (System.ArgumentException e)
{
System.Console.Out.WriteLine("Usage: MQTest62B -h host -p port -c channel -m QueueManagerName -q QueueName");
System.Environment.Exit(1);
}
catch (MQException e)
{
System.Console.Out.WriteLine(e);
System.Environment.Exit(1);
}
System.Environment.Exit(0);
}
}
}
我的代码:
//Initialize MQMessage
MQMessage message = new MQMessage();
//Initialize WebMQConnection
WebSphereMQConnection mqRequestConnection = new WebSphereMQConnection(initQMName, initQChannel, initQConnection, initQName, string.Empty, string.Empty);
mqRequestConnection.Open();
mqRequestConnection.Read(message);
//Get the contents as a string
string body = message.ReadString(message.MessageLength);
return body;
此代码是控制台应用程序的一部分,按预期顺序浏览队列并读取每条消息。它解析平面文件中的字符串内容。
不过,Read() 方法似乎也会锁定消息,直到程序关闭。即使我 运行 循环中的程序顺序读取所有消息,它似乎也不会 'release' 消息,直到程序完全关闭。
我尝试了 Gets 和 Puts、Dispose、Backout 等,除了停止整个控制台应用程序的执行之外似乎没有任何效果。
//Initialize WebMQConnection
WebSphereMQConnection mqRequestConnection = new WebSphereMQConnection(initQMName, initQChannel, initQConnection, initQName, string.Empty, string.Empty);
mqRequestConnection.Open();
mqRequestConnection.Read(message);
这到底是什么东西?它肯定不是 IBM MQ class 和方法。所以,它是土生土长的 class.
所以,Read()方法实际上执行了一个'browse'。命名方法不好。我敢打赌,在您的 Read() 方法中,它使用了 MQGET 上的 MQGMO_LOCK 选项。
您为什么不摆脱 WebSphereMQConnection class 并只编写纯 IBM MQ .NET 代码?
这是一个 MQ CS .NET 示例程序,用于在没有任何消息锁定的情况下浏览队列:
using System;
using IBM.WMQ;
/// <summary> Program Name
/// MQTest62B
///
/// Description
/// This C# class will connect to a remote queue manager
/// and get (browse) a message from a queue using a managed .NET environment.
///
/// Sample Command Line Parameters
/// -h 127.0.0.1 -p 1415 -c TEST.CHL -m MQWT1 -q TEST.Q1
/// </summary>
/// <author> Roger Lacroix, Capitalware Inc.
/// </author>
namespace MQTest62
{
public class MQTest62B
{
private System.Collections.Hashtable inParms = null;
private System.String qManager;
private System.String outputQName;
private System.String userID = "tester";
private System.String password = "barney";
/*
* The constructor
*/
public MQTest62B()
: base()
{
}
/// <summary> Make sure the required parameters are present.</summary>
/// <returns> true/false
/// </returns>
private bool allParamsPresent()
{
bool b = inParms.ContainsKey("-h") && inParms.ContainsKey("-p") &&
inParms.ContainsKey("-c") && inParms.ContainsKey("-m") &&
inParms.ContainsKey("-q");
if (b)
{
try
{
System.Int32.Parse((System.String)inParms["-p"]);
}
catch (System.FormatException e)
{
b = false;
}
}
return b;
}
/// <summary> Extract the command-line parameters and initialize the MQ variables.</summary>
/// <param name="args">
/// </param>
/// <throws> IllegalArgumentException </throws>
private void init(System.String[] args)
{
inParms = System.Collections.Hashtable.Synchronized(new System.Collections.Hashtable(14));
if (args.Length > 0 && (args.Length % 2) == 0)
{
for (int i = 0; i < args.Length; i += 2)
{
inParms[args[i]] = args[i + 1];
}
}
else
{
throw new System.ArgumentException();
}
if (allParamsPresent())
{
qManager = ((System.String)inParms["-m"]);
outputQName = ((System.String)inParms["-q"]);
// Set up MQ environment
MQEnvironment.Hostname = ((System.String)inParms["-h"]);
MQEnvironment.Channel = ((System.String)inParms["-c"]);
try
{
MQEnvironment.Port = System.Int32.Parse((System.String)inParms["-p"]);
}
catch (System.FormatException e)
{
MQEnvironment.Port = 1414;
}
if (userID != null)
MQEnvironment.UserId = userID;
if (password != null)
MQEnvironment.Password = password;
}
else
{
throw new System.ArgumentException();
}
}
/// <summary> Connect, open queue, read (browse) a message, close queue and disconnect. </summary>
///
private void testReceive()
{
MQQueueManager qMgr = null;
MQQueue inQ = null;
int openOptions = MQC.MQOO_BROWSE + MQC.MQOO_FAIL_IF_QUIESCING;
try
{
qMgr = new MQQueueManager(qManager);
System.Console.Out.WriteLine("MQTest62B successfully connected to " + qManager);
inQ = qMgr.AccessQueue(outputQName, openOptions, null, null, null); // no alternate user id
System.Console.Out.WriteLine("MQTest62B successfully opened " + outputQName);
testLoop(inQ);
}
catch (MQException mqex)
{
System.Console.Out.WriteLine("MQTest62B cc=" + mqex.CompletionCode + " : rc=" + mqex.ReasonCode);
}
catch (System.IO.IOException ioex)
{
System.Console.Out.WriteLine("MQTest62B ioex=" + ioex);
}
finally
{
try
{
if (inQ != null)
inQ.Close();
System.Console.Out.WriteLine("MQTest62B closed: " + outputQName);
}
catch (MQException mqex)
{
System.Console.Out.WriteLine("MQTest62B cc=" + mqex.CompletionCode + " : rc=" + mqex.ReasonCode);
}
try
{
if (qMgr != null)
qMgr.Disconnect();
System.Console.Out.WriteLine("MQTest62B disconnected from " + qManager);
}
catch (MQException mqex)
{
System.Console.Out.WriteLine("MQTest62B cc=" + mqex.CompletionCode + " : rc=" + mqex.ReasonCode);
}
}
}
private void testLoop(MQQueue inQ)
{
bool flag = true;
MQMessage msg = new MQMessage();
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.Options |= MQC.MQGMO_BROWSE_NEXT | MQC.MQGMO_WAIT | MQC.MQGMO_FAIL_IF_QUIESCING;
gmo.WaitInterval = 500; // 1/2 second wait time or MQC.MQEI_UNLIMITED
while (flag)
{
try
{
msg = new MQMessage();
inQ.Get(msg, gmo);
System.Console.Out.WriteLine("Message Data: " + msg.ReadString(msg.MessageLength));
}
catch (MQException mqex)
{
System.Console.Out.WriteLine("MQTest62B CC=" + mqex.CompletionCode + " : RC=" + mqex.ReasonCode);
if (mqex.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
{
// no meesage - life is good - loop again
}
else
{
flag = false; // severe error - time to exit
}
}
catch (System.IO.IOException ioex)
{
System.Console.Out.WriteLine("MQTest62B ioex=" + ioex);
}
}
}
/// <summary> main line</summary>
/// <param name="args">
/// </param>
// [STAThread]
public static void Main(System.String[] args)
{
MQTest62B write = new MQTest62B();
try
{
write.init(args);
write.testReceive();
}
catch (System.ArgumentException e)
{
System.Console.Out.WriteLine("Usage: MQTest62B -h host -p port -c channel -m QueueManagerName -q QueueName");
System.Environment.Exit(1);
}
catch (MQException e)
{
System.Console.Out.WriteLine(e);
System.Environment.Exit(1);
}
System.Environment.Exit(0);
}
}
}