Azure ServiceBus OnMessage 是否阻塞调用?
Azure ServiceBus OnMessage Blocking Call or Not?
我们正在利用 Azure ServiceBus 队列来处理大量客户端请求。然而,OnMessage
调用似乎是一个阻塞调用,但是如果它确实是一个阻塞调用,那么这个调用的阻塞是非常不一致的。
我试图完成的是从 Web 服务应用程序永久地观察队列(以允许从 运行 应用程序中挖掘指标)
我正在创建以下订阅:
protected virtual void Subscribe(string queueName, Func<QueueRequest, bool> callback)
{
var client = GetClient(queueName, PollingTimeout);
var transformCallback = new Action<BrokeredMessage>((message) =>
{
try
{
var request = message.ToQueueRequest();
if (callback(request))
{
message.Complete();
}
else
{
message.Abandon();
}
}
catch (Exception ex)
{
//TODO: Log the error
message.Abandon();
}
});
var options = new OnMessageOptions
{
MaxConcurrentCalls = _config.GetInt("MaxThreadsPerQueue"),
AutoComplete = false
};
options.ExceptionReceived += OnMessageError;
client.OnMessage(transformCallback, options);
}
如果我只调用一次订阅,应用程序将停止监视队列,从而停止处理消息。但是,如果我在我的订阅调用周围放置一个 while 循环。因此,我非常犹豫地写了下面的代码片段,以便在 OnMessage
完成后重新订阅。
protected void MonitorQueue()
{
IsRunning = true;
while (IsRunning)
{
try
{
Log.Info("MonitoringThread: OnMessage beginning logging for {0}", QueueName);
QueueClient.Subscribe(QueueName, Processor);
Log.Info("MonitoringThread: OnMessage ended logging for {0}", QueueName);
}
catch (Exception ex)
{
IsRunning = false;
Log.Error("MonitoringThread: Error in subscription for {0}: ", ex, QueueName);
}
if (SleepBeforeReinit > 0 && IsRunning)
{
Thread.Sleep(SleepBeforeReinit);
}
}
}
这解决了消息由于未被提取而过期为死信的问题,但这会导致其他问题。
由于 OnMessage 是一项计费操作,当我看到日志文件告诉我队列开始和结束的间隔不到一秒并且我的日志文件的大小增加时,我很担心 非常 迅速.
我将 MessagingFactory
设置为 OperationTimeout
1 天,但这似乎并没有像我预期的那样影响订阅打开/关闭状态的频率。
我已经看到很多示例以工作者角色的身份执行此操作,但是这不会完成我们正在尝试做的事情。我目前正在从我们的网络应用程序的 Global.asax.cs 连接它。非常感谢任何建议!
您的第一种方法是正确的,MaxThreadsPerQueue 的值决定了有多少线程可用于处理您的消息。听起来好像这些线程正在被用完(也许回调处理您的消息需要一些时间?)。
您应该考虑使用 QueueClient.OnMessageAsync Method 来接收消息。
OnMessage 和 OnMessageAsync 不阻止调用。这些需要实例化一次,并将继续订阅队列,直到应用程序终止。
有关详细信息,请参阅相关 post:
我们正在利用 Azure ServiceBus 队列来处理大量客户端请求。然而,OnMessage
调用似乎是一个阻塞调用,但是如果它确实是一个阻塞调用,那么这个调用的阻塞是非常不一致的。
我试图完成的是从 Web 服务应用程序永久地观察队列(以允许从 运行 应用程序中挖掘指标)
我正在创建以下订阅:
protected virtual void Subscribe(string queueName, Func<QueueRequest, bool> callback)
{
var client = GetClient(queueName, PollingTimeout);
var transformCallback = new Action<BrokeredMessage>((message) =>
{
try
{
var request = message.ToQueueRequest();
if (callback(request))
{
message.Complete();
}
else
{
message.Abandon();
}
}
catch (Exception ex)
{
//TODO: Log the error
message.Abandon();
}
});
var options = new OnMessageOptions
{
MaxConcurrentCalls = _config.GetInt("MaxThreadsPerQueue"),
AutoComplete = false
};
options.ExceptionReceived += OnMessageError;
client.OnMessage(transformCallback, options);
}
如果我只调用一次订阅,应用程序将停止监视队列,从而停止处理消息。但是,如果我在我的订阅调用周围放置一个 while 循环。因此,我非常犹豫地写了下面的代码片段,以便在 OnMessage
完成后重新订阅。
protected void MonitorQueue()
{
IsRunning = true;
while (IsRunning)
{
try
{
Log.Info("MonitoringThread: OnMessage beginning logging for {0}", QueueName);
QueueClient.Subscribe(QueueName, Processor);
Log.Info("MonitoringThread: OnMessage ended logging for {0}", QueueName);
}
catch (Exception ex)
{
IsRunning = false;
Log.Error("MonitoringThread: Error in subscription for {0}: ", ex, QueueName);
}
if (SleepBeforeReinit > 0 && IsRunning)
{
Thread.Sleep(SleepBeforeReinit);
}
}
}
这解决了消息由于未被提取而过期为死信的问题,但这会导致其他问题。
由于 OnMessage 是一项计费操作,当我看到日志文件告诉我队列开始和结束的间隔不到一秒并且我的日志文件的大小增加时,我很担心 非常 迅速.
我将 MessagingFactory
设置为 OperationTimeout
1 天,但这似乎并没有像我预期的那样影响订阅打开/关闭状态的频率。
我已经看到很多示例以工作者角色的身份执行此操作,但是这不会完成我们正在尝试做的事情。我目前正在从我们的网络应用程序的 Global.asax.cs 连接它。非常感谢任何建议!
您的第一种方法是正确的,MaxThreadsPerQueue 的值决定了有多少线程可用于处理您的消息。听起来好像这些线程正在被用完(也许回调处理您的消息需要一些时间?)。
您应该考虑使用 QueueClient.OnMessageAsync Method 来接收消息。
OnMessage 和 OnMessageAsync 不阻止调用。这些需要实例化一次,并将继续订阅队列,直到应用程序终止。
有关详细信息,请参阅相关 post: