服务总线队列的多个客户中的取消令牌处理
Cancellation token processing in multiple customers of Service Bus Queue
我在单个进程中有可配置的服务器总线队列消费者数。该代码使用 QueueClient class 的 ReceiveAsync 方法并调用 QueueClient.Close关于取消。
它工作得很好,但事实证明关闭 QueueClient 存在一些问题 - 只有一个客户端立即结束,所有其他客户端挂起直到 serverWaitTime 超时到期。
查看代码及其输出:
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.ServiceBus.Messaging;
public class Program
{
private static void Main()
{
CancellationTokenSource source = new CancellationTokenSource();
var cancellationToken = source.Token;
var logger = new Logger();
Task.Run(() =>
{
Task.Delay(TimeSpan.FromSeconds(10)).Wait();
source.Cancel();
logger.Log("Cancellation requested.");
});
string connectionString = "...";
string queueName = "...";
var workers = Enumerable.Range(1, 3).Select(i => new Worker(connectionString, queueName, logger));
var tasks = workers.Select(worker => Task.Run(() => worker.RunAsync(cancellationToken), cancellationToken)).ToArray();
Task.WaitAll(tasks);
logger.Log("The end.");
}
}
class Worker
{
private readonly Logger _logger;
private readonly QueueClient _queueClient;
public Worker(string connectionString, string queueName, Logger logger)
{
_logger = logger;
_queueClient = QueueClient.CreateFromConnectionString(connectionString, queueName);
}
public async Task RunAsync(CancellationToken cancellationToken)
{
_logger.Log($"Worker {GetHashCode()} started.");
using (cancellationToken.Register(() => _queueClient.Close()))
while (!cancellationToken.IsCancellationRequested)
{
try
{
var message = await _queueClient.ReceiveAsync(TimeSpan.FromSeconds(20));
_logger.Log($"Worker {GetHashCode()}: Process message {message.MessageId}...");
}
catch (OperationCanceledException ex)
{
_logger.Log($"Worker {GetHashCode()}: {ex.Message}");
}
}
_logger.Log($"Worker {GetHashCode()} finished.");
}
}
class Logger
{
private readonly Stopwatch _stopwatch;
public Logger()
{
_stopwatch = new Stopwatch();
_stopwatch.Start();
}
public void Log(string message) => Console.WriteLine($"{_stopwatch.Elapsed}: {message}");
}
输出:
00:00:00.8125644: Worker 12547953 started.
00:00:00.8127684: Worker 45653674 started.
00:00:00.8127314: Worker 59817589 started.
00:00:10.4534961: Cancellation requested.
00:00:11.4912900: Worker 45653674: The operation cannot be performed because the entity has been closed or aborted.
00:00:11.4914054: Worker 45653674 finished.
00:00:22.3242631: Worker 12547953: The operation cannot be performed because the entity has been closed or aborted.
00:00:22.3244501: Worker 12547953 finished.
00:00:22.3243945: Worker 59817589: The operation cannot be performed because the entity has been closed or aborted.
00:00:22.3252456: Worker 59817589 finished.
00:00:22.3253535: The end.
所以你可以看到工人 45653674 立即停止,但其他两个仅在 10 秒后停止。
我在这篇文章中找到了一些有用的信息:https://developers.de/blogs/damir_dobric/archive/2013/12/03/service-bus-undocumented-scaling-tips-amp-tricks.aspx。如果每个队列客户端都通过自己的物理连接工作,问题就会消失。
因此,要解决此问题,有必要替换以下代码:
_queueClient = QueueClient.CreateFromConnectionString(connectionString, queueName);
与
var factory = MessagingFactory.CreateFromConnectionString(connectionString);
_queueClient = factory.CreateQueueClient(queueName);
我在单个进程中有可配置的服务器总线队列消费者数。该代码使用 QueueClient class 的 ReceiveAsync 方法并调用 QueueClient.Close关于取消。
它工作得很好,但事实证明关闭 QueueClient 存在一些问题 - 只有一个客户端立即结束,所有其他客户端挂起直到 serverWaitTime 超时到期。
查看代码及其输出:
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.ServiceBus.Messaging;
public class Program
{
private static void Main()
{
CancellationTokenSource source = new CancellationTokenSource();
var cancellationToken = source.Token;
var logger = new Logger();
Task.Run(() =>
{
Task.Delay(TimeSpan.FromSeconds(10)).Wait();
source.Cancel();
logger.Log("Cancellation requested.");
});
string connectionString = "...";
string queueName = "...";
var workers = Enumerable.Range(1, 3).Select(i => new Worker(connectionString, queueName, logger));
var tasks = workers.Select(worker => Task.Run(() => worker.RunAsync(cancellationToken), cancellationToken)).ToArray();
Task.WaitAll(tasks);
logger.Log("The end.");
}
}
class Worker
{
private readonly Logger _logger;
private readonly QueueClient _queueClient;
public Worker(string connectionString, string queueName, Logger logger)
{
_logger = logger;
_queueClient = QueueClient.CreateFromConnectionString(connectionString, queueName);
}
public async Task RunAsync(CancellationToken cancellationToken)
{
_logger.Log($"Worker {GetHashCode()} started.");
using (cancellationToken.Register(() => _queueClient.Close()))
while (!cancellationToken.IsCancellationRequested)
{
try
{
var message = await _queueClient.ReceiveAsync(TimeSpan.FromSeconds(20));
_logger.Log($"Worker {GetHashCode()}: Process message {message.MessageId}...");
}
catch (OperationCanceledException ex)
{
_logger.Log($"Worker {GetHashCode()}: {ex.Message}");
}
}
_logger.Log($"Worker {GetHashCode()} finished.");
}
}
class Logger
{
private readonly Stopwatch _stopwatch;
public Logger()
{
_stopwatch = new Stopwatch();
_stopwatch.Start();
}
public void Log(string message) => Console.WriteLine($"{_stopwatch.Elapsed}: {message}");
}
输出:
00:00:00.8125644: Worker 12547953 started.
00:00:00.8127684: Worker 45653674 started.
00:00:00.8127314: Worker 59817589 started.
00:00:10.4534961: Cancellation requested.
00:00:11.4912900: Worker 45653674: The operation cannot be performed because the entity has been closed or aborted.
00:00:11.4914054: Worker 45653674 finished.
00:00:22.3242631: Worker 12547953: The operation cannot be performed because the entity has been closed or aborted.
00:00:22.3244501: Worker 12547953 finished.
00:00:22.3243945: Worker 59817589: The operation cannot be performed because the entity has been closed or aborted.
00:00:22.3252456: Worker 59817589 finished.
00:00:22.3253535: The end.
所以你可以看到工人 45653674 立即停止,但其他两个仅在 10 秒后停止。
我在这篇文章中找到了一些有用的信息:https://developers.de/blogs/damir_dobric/archive/2013/12/03/service-bus-undocumented-scaling-tips-amp-tricks.aspx。如果每个队列客户端都通过自己的物理连接工作,问题就会消失。
因此,要解决此问题,有必要替换以下代码:
_queueClient = QueueClient.CreateFromConnectionString(connectionString, queueName);
与
var factory = MessagingFactory.CreateFromConnectionString(connectionString);
_queueClient = factory.CreateQueueClient(queueName);