WebSphere MQ/C# 订阅:主题 GET 永远挂起
WebSphere MQ/C# subscribe: topic GET hangs forever
我正在尝试在 C#/.Net 应用程序中实现 MQ publish/subscribe。
我已按照本教程中的说明进行操作:
https://tekslate.com/publish-subscribe-in-websphere-mq-series/
- a) 队列管理器:QM
- b) TCP 侦听器 运行,端口:1420
- c) 主题名称:NEWS.SPORTS.CRICKET
- d) 订阅名称还有:NEWS.SPORTS.CRICKET
- e) 主题字符串:NEW/SPORTS/CRICKET
- f) 目标队列:SportsQ
我成功地能够 "test publish" 在 MQ Explorer 中。
我在订阅中看到 "message count = 1"。
我在 SportsQ
中看到 "queue depth = 1"
我可以连接到 QM,我可以访问该主题...但是当我执行 "topic.Get(message")
[ 时它挂起=41=]
问:为什么 MQ "Get()" 挂了?????
代码:
using IBM.WMQ;
using System;
using System.Collections;
namespace HelloSubscribe
{
class Program
{
static void Main(string[] args)
{
string qmName = "QM";
string hostName = "localhost";
string strPort = "1420";
string channelName = "SYSTEM.DEF.SVRCONN";
string transport = MQC.TRANSPORT_MQSERIES_CLIENT;
Hashtable connectionProperties = new Hashtable();
connectionProperties.Add(MQC.HOST_NAME_PROPERTY, hostName);
connectionProperties.Add(MQC.PORT_PROPERTY, strPort);
connectionProperties.Add(MQC.CHANNEL_PROPERTY, channelName);
MQQueueManager mqQueueManager = new MQQueueManager(qmName, connectionProperties);
string topicObject = null;
string topicString = "NEWS/SPORTS/CRICKET";
string subscriptionName = "NEWS.SPORTS.CRICKET";
string topicName = "NEWS.SPORTS.CRICKET";
int openOptionsForGet = MQC.MQSO_CREATE | MQC.MQSO_FAIL_IF_QUIESCING | MQC.MQSO_DURABLE | MQC.MQSO_RESUME;
MQTopic destForGet = mqQueueManager.AccessTopic(topicString, null, openOptionsForGet, null, subscriptionName);
MQMessage messageForGet = new MQMessage();
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.Options |= MQC.MQGMO_WAIT | MQC.MQGMO_FAIL_IF_QUIESCING;
gmo.WaitInterval = 1000; // wait 60 seconds
destForGet.Get(messageForGet, gmo);
string msg = messageForGet.ReadLine();
destForGet.Close();
mqQueueManager.Disconnect();
mqQueueManager.Close();
}
}
}
我是运行 WebSphere MQ 7.5,使用的安装版本是amqdnet.dll,和Visual Studio 2015.
MQ 管理订阅
您参考的教程让您设置 MQ 管理订阅以将主题字符串订阅到特定的 MQ 队列,这样,发布到与订阅的主题字符串匹配的主题的任何消息都将被放入您指定的队列中MQ 管理订阅,例如 SportsQ
,要从队列中读取消息,您会将其视为队列而不是主题,并使用 AccessQueue
方法访问它,例如:
string queueName = "SportsQ";
int openOptionsForGet = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_FAIL_IF_QUIESCING;
MQQueue destForGet = mqQueueManager.AccessQueue(queueName, openOptionsForGet, null, null, null);
托管持久订阅
您在提供的代码中演示的内容称为托管持久订阅,在后台 MQ 将创建一个前缀为 SYSTEM.MANAGED.DURABLE.<uniq 16 character HEX value>
的队列并将其订阅到主题字符串。如果您断开连接然后使用相同的 "subscription name" 恢复,那么您将连接到相同的托管队列并接收您未连接时发布的任何消息。
请注意,除了称为保留发布的内容(MQ 会在其中为未来的订阅者保存最新发布)之外,您在订阅之前不会收到发布到主题字符串的任何消息,这就是为什么您看不到您在创建托管持久订阅之前通过 MQ Explorer 发布的消息。
根据您提供的内容,您应该能够向主题字符串发布一条新消息,您的应用程序会收到它。
非托管持久订阅
另一种选择是删除您创建的 MQ 管理订阅并更改您的程序以在您提供队列目标的位置创建非托管订阅,例如:
string queueName = "SportsQ";
int QueueOpenOptionsForGet = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_FAIL_IF_QUIESCING;
int TopicOpenOptionsForGet = MQC.MQSO_CREATE | MQC.MQSO_FAIL_IF_QUIESCING | MQC.MQSO_DURABLE | MQC.MQSO_RESUME;
MQQueue destQueue = mqQueueManager.AccessQueue(queueName, QueueOpenOptionsForGet);
MQTopic destForGet = mqQueueManager.AccessTopic(destQueue, topicString, null, openOptionsForGet, null, subscriptionName);
这会导致创建一个 API 订阅,将主题字符串链接到您作为目标提供的队列。
就像 MQ 管理订阅一样,您随后可以将其视为一个队列,或者您可以恢复订阅。
无需知道队列名称即可恢复非托管持久订阅
您仍需要调用将目标作为第一个参数的 AccessTopic 重载,但您传入 NULL 而不是 MQDestination,程序将恢复已创建的非托管订阅,而无需知道队列名称。我能够编译并测试以下是否有效:
using IBM.WMQ;
using System;
using System.Collections;
namespace HelloSubscribe
{
class Program
{
static void Main(string[] args)
{
string qmName = "QM";
string hostName = "localhost";
string strPort = "1420";
string channelName = "SYSTEM.DEF.SVRCONN";
string transport = MQC.TRANSPORT_MQSERIES_CLIENT;
Hashtable connectionProperties = new Hashtable();
connectionProperties.Add(MQC.HOST_NAME_PROPERTY, hostName);
connectionProperties.Add(MQC.PORT_PROPERTY, strPort);
connectionProperties.Add(MQC.CHANNEL_PROPERTY, channelName);
MQQueueManager mqQueueManager = new MQQueueManager(qmName, connectionProperties);
string topicString = "NEWS/SPORTS/CRICKET";
string subscriptionName = "NEWS.SPORTS.CRICKET";
int openOptionsForGet = MQC.MQSO_FAIL_IF_QUIESCING | MQC.MQSO_DURABLE | MQC.MQSO_RESUME;
MQTopic destForGet = mqQueueManager.AccessTopic(null, topicString, null, openOptionsForGet, null, subscriptionName);
MQMessage messageForGet = new MQMessage();
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.Options |= MQC.MQGMO_WAIT | MQC.MQGMO_FAIL_IF_QUIESCING;
gmo.WaitInterval = 60000; // wait 60 seconds
destForGet.Get(messageForGet, gmo);
string msg = messageForGet.ReadLine();
System.Console.WriteLine("Received message data : " + msg);
destForGet.Close();
mqQueueManager.Disconnect();
mqQueueManager.Close();
}
}
}
另请注意,您甚至不需要 topicString,它也可以为 null,只需 subscriptionName 即可恢复它:
MQTopic destForGet = mqQueueManager.AccessTopic(null, null, null, openOptionsForGet, null, subscriptionName);
一些其他注意事项,教程中的示例提供了混合大小写的队列名称,不建议这样做,因为在许多地方,如果您不小心,IBM MQ 会将东西折叠为大写。推荐的最佳做法是对 IBM MQ 对象名称(队列等)使用大写。
WaitInterval 以毫秒为单位,因此将其设置为 1000 将等待 1 秒而不是 60 秒。
我通过指定目标队列让它工作了:
...
string topicString = "NEWS/SPORTS/CRICKET";
string subscriptionName = "NEWS.SPORTS.CRICKET";
string topicName = "NEWS.SPORTS.CRICKET";
MQDestination unmanagedDest = mqQueueManager.AccessQueue("SportsQ", MQC.MQOO_INPUT_EXCLUSIVE | MQC.MQOO_FAIL_IF_QUIESCING);
int openOptionsForGet = MQC.MQSO_CREATE | MQC.MQSO_FAIL_IF_QUIESCING | MQC.MQSO_DURABLE | MQC.MQSO_RESUME;
// MQTopic destForGet = mqQueueManager.AccessTopic(topicName, null, openOptionsForGet, null, subscriptionName); // Hangs in "Get()"
// MQTopic destForGet = mqQueueManager.AccessTopic(topicName, null, MQC.MQTOPIC_OPEN_AS_SUBSCRIPTION, openOptionsForGet); // MQRC_SUB_NAME_ERROR
// MQTopic destForGet = mqQueueManager.AccessTopic(topicName, topicString, MQC.MQTOPIC_OPEN_AS_SUBSCRIPTION, openOptionsForGet); // MQRC_SUB_NAME_ERROR
// MQTopic destForGet = mqQueueManager.AccessTopic(topicName, topicString, openOptionsForGet, null, subscriptionName); // Hangs in "Get()"
// MQTopic destForGet = mqQueueManager.AccessTopic(topicName, "SportsQ", openOptionsForGet, null, subscriptionName); // Hangs
// MQTopic destForGet = mqQueueManager.AccessTopic(topicName, "SportsQ", MQC.MQTOPIC_OPEN_AS_SUBSCRIPTION, openOptionsForGet); // MQRC_SUB_NAME_ERROR
// MQTopic destForGet = mqQueueManager.AccessTopic(unmanagedDest, topicName, null, openOptionsForGet); // MQRC_SUB_NAME_ERROR
MQTopic destForGet = mqQueueManager.AccessTopic(unmanagedDest, topicName, null, openOptionsForGet, null, subscriptionName); // <-- this works!
MQMessage messageForGet = new MQMessage();
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.Options |= MQC.MQGMO_WAIT | MQC.MQGMO_FAIL_IF_QUIESCING;
gmo.WaitInterval = 1000; // wait 60 seconds
destForGet.Get(messageForGet, gmo); // <-- No hang, if message present
string msg = messageForGet.ReadLine();
...
我正在尝试在 C#/.Net 应用程序中实现 MQ publish/subscribe。
我已按照本教程中的说明进行操作:
https://tekslate.com/publish-subscribe-in-websphere-mq-series/
- a) 队列管理器:QM
- b) TCP 侦听器 运行,端口:1420
- c) 主题名称:NEWS.SPORTS.CRICKET
- d) 订阅名称还有:NEWS.SPORTS.CRICKET
- e) 主题字符串:NEW/SPORTS/CRICKET
- f) 目标队列:SportsQ
我成功地能够 "test publish" 在 MQ Explorer 中。 我在订阅中看到 "message count = 1"。 我在 SportsQ
中看到 "queue depth = 1"
我可以连接到 QM,我可以访问该主题...但是当我执行 "topic.Get(message")
[ 时它挂起=41=]
问:为什么 MQ "Get()" 挂了?????
代码:
using IBM.WMQ;
using System;
using System.Collections;
namespace HelloSubscribe
{
class Program
{
static void Main(string[] args)
{
string qmName = "QM";
string hostName = "localhost";
string strPort = "1420";
string channelName = "SYSTEM.DEF.SVRCONN";
string transport = MQC.TRANSPORT_MQSERIES_CLIENT;
Hashtable connectionProperties = new Hashtable();
connectionProperties.Add(MQC.HOST_NAME_PROPERTY, hostName);
connectionProperties.Add(MQC.PORT_PROPERTY, strPort);
connectionProperties.Add(MQC.CHANNEL_PROPERTY, channelName);
MQQueueManager mqQueueManager = new MQQueueManager(qmName, connectionProperties);
string topicObject = null;
string topicString = "NEWS/SPORTS/CRICKET";
string subscriptionName = "NEWS.SPORTS.CRICKET";
string topicName = "NEWS.SPORTS.CRICKET";
int openOptionsForGet = MQC.MQSO_CREATE | MQC.MQSO_FAIL_IF_QUIESCING | MQC.MQSO_DURABLE | MQC.MQSO_RESUME;
MQTopic destForGet = mqQueueManager.AccessTopic(topicString, null, openOptionsForGet, null, subscriptionName);
MQMessage messageForGet = new MQMessage();
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.Options |= MQC.MQGMO_WAIT | MQC.MQGMO_FAIL_IF_QUIESCING;
gmo.WaitInterval = 1000; // wait 60 seconds
destForGet.Get(messageForGet, gmo);
string msg = messageForGet.ReadLine();
destForGet.Close();
mqQueueManager.Disconnect();
mqQueueManager.Close();
}
}
}
我是运行 WebSphere MQ 7.5,使用的安装版本是amqdnet.dll,和Visual Studio 2015.
MQ 管理订阅
您参考的教程让您设置 MQ 管理订阅以将主题字符串订阅到特定的 MQ 队列,这样,发布到与订阅的主题字符串匹配的主题的任何消息都将被放入您指定的队列中MQ 管理订阅,例如 SportsQ
,要从队列中读取消息,您会将其视为队列而不是主题,并使用 AccessQueue
方法访问它,例如:
string queueName = "SportsQ";
int openOptionsForGet = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_FAIL_IF_QUIESCING;
MQQueue destForGet = mqQueueManager.AccessQueue(queueName, openOptionsForGet, null, null, null);
托管持久订阅
您在提供的代码中演示的内容称为托管持久订阅,在后台 MQ 将创建一个前缀为 SYSTEM.MANAGED.DURABLE.<uniq 16 character HEX value>
的队列并将其订阅到主题字符串。如果您断开连接然后使用相同的 "subscription name" 恢复,那么您将连接到相同的托管队列并接收您未连接时发布的任何消息。
请注意,除了称为保留发布的内容(MQ 会在其中为未来的订阅者保存最新发布)之外,您在订阅之前不会收到发布到主题字符串的任何消息,这就是为什么您看不到您在创建托管持久订阅之前通过 MQ Explorer 发布的消息。
根据您提供的内容,您应该能够向主题字符串发布一条新消息,您的应用程序会收到它。
非托管持久订阅
另一种选择是删除您创建的 MQ 管理订阅并更改您的程序以在您提供队列目标的位置创建非托管订阅,例如:
string queueName = "SportsQ";
int QueueOpenOptionsForGet = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_FAIL_IF_QUIESCING;
int TopicOpenOptionsForGet = MQC.MQSO_CREATE | MQC.MQSO_FAIL_IF_QUIESCING | MQC.MQSO_DURABLE | MQC.MQSO_RESUME;
MQQueue destQueue = mqQueueManager.AccessQueue(queueName, QueueOpenOptionsForGet);
MQTopic destForGet = mqQueueManager.AccessTopic(destQueue, topicString, null, openOptionsForGet, null, subscriptionName);
这会导致创建一个 API 订阅,将主题字符串链接到您作为目标提供的队列。
就像 MQ 管理订阅一样,您随后可以将其视为一个队列,或者您可以恢复订阅。
无需知道队列名称即可恢复非托管持久订阅
您仍需要调用将目标作为第一个参数的 AccessTopic 重载,但您传入 NULL 而不是 MQDestination,程序将恢复已创建的非托管订阅,而无需知道队列名称。我能够编译并测试以下是否有效:
using IBM.WMQ;
using System;
using System.Collections;
namespace HelloSubscribe
{
class Program
{
static void Main(string[] args)
{
string qmName = "QM";
string hostName = "localhost";
string strPort = "1420";
string channelName = "SYSTEM.DEF.SVRCONN";
string transport = MQC.TRANSPORT_MQSERIES_CLIENT;
Hashtable connectionProperties = new Hashtable();
connectionProperties.Add(MQC.HOST_NAME_PROPERTY, hostName);
connectionProperties.Add(MQC.PORT_PROPERTY, strPort);
connectionProperties.Add(MQC.CHANNEL_PROPERTY, channelName);
MQQueueManager mqQueueManager = new MQQueueManager(qmName, connectionProperties);
string topicString = "NEWS/SPORTS/CRICKET";
string subscriptionName = "NEWS.SPORTS.CRICKET";
int openOptionsForGet = MQC.MQSO_FAIL_IF_QUIESCING | MQC.MQSO_DURABLE | MQC.MQSO_RESUME;
MQTopic destForGet = mqQueueManager.AccessTopic(null, topicString, null, openOptionsForGet, null, subscriptionName);
MQMessage messageForGet = new MQMessage();
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.Options |= MQC.MQGMO_WAIT | MQC.MQGMO_FAIL_IF_QUIESCING;
gmo.WaitInterval = 60000; // wait 60 seconds
destForGet.Get(messageForGet, gmo);
string msg = messageForGet.ReadLine();
System.Console.WriteLine("Received message data : " + msg);
destForGet.Close();
mqQueueManager.Disconnect();
mqQueueManager.Close();
}
}
}
另请注意,您甚至不需要 topicString,它也可以为 null,只需 subscriptionName 即可恢复它:
MQTopic destForGet = mqQueueManager.AccessTopic(null, null, null, openOptionsForGet, null, subscriptionName);
一些其他注意事项,教程中的示例提供了混合大小写的队列名称,不建议这样做,因为在许多地方,如果您不小心,IBM MQ 会将东西折叠为大写。推荐的最佳做法是对 IBM MQ 对象名称(队列等)使用大写。
WaitInterval 以毫秒为单位,因此将其设置为 1000 将等待 1 秒而不是 60 秒。
我通过指定目标队列让它工作了:
...
string topicString = "NEWS/SPORTS/CRICKET";
string subscriptionName = "NEWS.SPORTS.CRICKET";
string topicName = "NEWS.SPORTS.CRICKET";
MQDestination unmanagedDest = mqQueueManager.AccessQueue("SportsQ", MQC.MQOO_INPUT_EXCLUSIVE | MQC.MQOO_FAIL_IF_QUIESCING);
int openOptionsForGet = MQC.MQSO_CREATE | MQC.MQSO_FAIL_IF_QUIESCING | MQC.MQSO_DURABLE | MQC.MQSO_RESUME;
// MQTopic destForGet = mqQueueManager.AccessTopic(topicName, null, openOptionsForGet, null, subscriptionName); // Hangs in "Get()"
// MQTopic destForGet = mqQueueManager.AccessTopic(topicName, null, MQC.MQTOPIC_OPEN_AS_SUBSCRIPTION, openOptionsForGet); // MQRC_SUB_NAME_ERROR
// MQTopic destForGet = mqQueueManager.AccessTopic(topicName, topicString, MQC.MQTOPIC_OPEN_AS_SUBSCRIPTION, openOptionsForGet); // MQRC_SUB_NAME_ERROR
// MQTopic destForGet = mqQueueManager.AccessTopic(topicName, topicString, openOptionsForGet, null, subscriptionName); // Hangs in "Get()"
// MQTopic destForGet = mqQueueManager.AccessTopic(topicName, "SportsQ", openOptionsForGet, null, subscriptionName); // Hangs
// MQTopic destForGet = mqQueueManager.AccessTopic(topicName, "SportsQ", MQC.MQTOPIC_OPEN_AS_SUBSCRIPTION, openOptionsForGet); // MQRC_SUB_NAME_ERROR
// MQTopic destForGet = mqQueueManager.AccessTopic(unmanagedDest, topicName, null, openOptionsForGet); // MQRC_SUB_NAME_ERROR
MQTopic destForGet = mqQueueManager.AccessTopic(unmanagedDest, topicName, null, openOptionsForGet, null, subscriptionName); // <-- this works!
MQMessage messageForGet = new MQMessage();
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.Options |= MQC.MQGMO_WAIT | MQC.MQGMO_FAIL_IF_QUIESCING;
gmo.WaitInterval = 1000; // wait 60 seconds
destForGet.Get(messageForGet, gmo); // <-- No hang, if message present
string msg = messageForGet.ReadLine();
...