以破坏性方式从 WebSphere 读取消息
Read messages from WebSphere in destructive manner
我正在尝试从 WebSphere 执行破坏性读取,即我读取了应该立即从队列中删除的消息。
我编写的代码运行良好,直到我开始弄乱消息。例如,最后一个是"add one message, read it, wait on empty queue, then add two messages"。在我的场景中,这个程序应该读取第一条消息,等到出现某些内容,然后再读取它。
但是,问题是我遇到了卡住的情况。我在队列中有一条消息,但如果使用 BROWSE 或 CURSOR,我无法阅读。这是我的代码:
MQEnvironment.UserId = _queueSettings.UserName;
MQEnvironment.Password = _queueSettings.Password;
var manager = new MQQueueManager(_queueSettings.QueueManagerName, _queueSettings.ChannelName, _queueSettings.ConnectionName);
var queue = manager.AccessQueue(_queueSettings.QueueName, MQC.MQOO_FAIL_IF_QUIESCING | MQC.MQOO_INPUT_SHARED | MQC.MQOO_BROWSE);
var browseFirstOptions = new MQGetMessageOptions { Options = MQC.MQGMO_BROWSE_FIRST };
var cursorOptions = new MQGetMessageOptions { Options = MQC.MQGMO_MSG_UNDER_CURSOR };
var currentOptions = browseFirstOptions;
while (!cancellationToken.IsCancellationRequested)
{
var logger = _contextlessLogger.ForContext("requestId", Guid.NewGuid());
try
{
var msg = new MQMessage();
queue.Get(msg, currentOptions);
if (currentOptions == browseFirstOptions)
{
currentOptions = cursorOptions;
continue;
}
string messageText = msg.ReadString(msg.MessageLength);
RunProcessingTask(logger, messageText);
}
catch (MQException ex) when (IsNoMessagesException(ex) && currentOptions != browseFirstOptions)
{
currentOptions = browseFirstOptions;
}
catch (MQException ex) when (IsNoMessagesException(ex))
{
const int sleepIntervalMs = 5000;
_contextlessLogger.Information("No messages in the queue. Sleeping for {sleepIntervalMs}ms", sleepIntervalMs);
await Task.Delay(sleepIntervalMs);
}
catch (Exception ex)
{
logger.Error(ex, "Unexpected error occured");
}
}
private static bool IsNoMessagesException(MQException exception) =>
exception.ReasonCode == MQC.MQRC_NO_MSG_AVAILABLE
|| exception.ReasonCode == MQC.MQRC_NO_MSG_UNDER_CURSOR;
我只收到 2033
或 2034
,而我在 UI 中看到有一条消息。
怎么做到的?也许我做错了?
我也添加了 Java 标签,因为它们的代码根本没有区别,例如我使用 this code 作为参考。
while I can see in UI that there is a message.
我假设您看到当前队列深度为 1。
如果将消息放入队列,IBM MQ 会立即增加当前队列深度,即 在 提交事务之前。但是您只能在事务提交后收到消息。
我正在尝试从 WebSphere 执行破坏性读取,即我读取了应该立即从队列中删除的消息。
我编写的代码运行良好,直到我开始弄乱消息。例如,最后一个是"add one message, read it, wait on empty queue, then add two messages"。在我的场景中,这个程序应该读取第一条消息,等到出现某些内容,然后再读取它。
但是,问题是我遇到了卡住的情况。我在队列中有一条消息,但如果使用 BROWSE 或 CURSOR,我无法阅读。这是我的代码:
MQEnvironment.UserId = _queueSettings.UserName;
MQEnvironment.Password = _queueSettings.Password;
var manager = new MQQueueManager(_queueSettings.QueueManagerName, _queueSettings.ChannelName, _queueSettings.ConnectionName);
var queue = manager.AccessQueue(_queueSettings.QueueName, MQC.MQOO_FAIL_IF_QUIESCING | MQC.MQOO_INPUT_SHARED | MQC.MQOO_BROWSE);
var browseFirstOptions = new MQGetMessageOptions { Options = MQC.MQGMO_BROWSE_FIRST };
var cursorOptions = new MQGetMessageOptions { Options = MQC.MQGMO_MSG_UNDER_CURSOR };
var currentOptions = browseFirstOptions;
while (!cancellationToken.IsCancellationRequested)
{
var logger = _contextlessLogger.ForContext("requestId", Guid.NewGuid());
try
{
var msg = new MQMessage();
queue.Get(msg, currentOptions);
if (currentOptions == browseFirstOptions)
{
currentOptions = cursorOptions;
continue;
}
string messageText = msg.ReadString(msg.MessageLength);
RunProcessingTask(logger, messageText);
}
catch (MQException ex) when (IsNoMessagesException(ex) && currentOptions != browseFirstOptions)
{
currentOptions = browseFirstOptions;
}
catch (MQException ex) when (IsNoMessagesException(ex))
{
const int sleepIntervalMs = 5000;
_contextlessLogger.Information("No messages in the queue. Sleeping for {sleepIntervalMs}ms", sleepIntervalMs);
await Task.Delay(sleepIntervalMs);
}
catch (Exception ex)
{
logger.Error(ex, "Unexpected error occured");
}
}
private static bool IsNoMessagesException(MQException exception) =>
exception.ReasonCode == MQC.MQRC_NO_MSG_AVAILABLE
|| exception.ReasonCode == MQC.MQRC_NO_MSG_UNDER_CURSOR;
我只收到 2033
或 2034
,而我在 UI 中看到有一条消息。
怎么做到的?也许我做错了?
我也添加了 Java 标签,因为它们的代码根本没有区别,例如我使用 this code 作为参考。
while I can see in UI that there is a message.
我假设您看到当前队列深度为 1。
如果将消息放入队列,IBM MQ 会立即增加当前队列深度,即 在 提交事务之前。但是您只能在事务提交后收到消息。