保持请求直到消息在 ConcurrentQueue 前面不起作用
Holding requests until a message is in front of ConcurrentQueue not working
我遇到了一个问题,我的单例中的 ConcurrentQueue 似乎没有按正确的顺序处理项目。我知道这是 FIFO,所以我在想也许内存中的队列不一样,或者我的 Dequeue 出了什么问题?我对此进行测试的方法是快速向我的 API 端点发出 3 个邮递员请求。如果有人能帮助我理解为什么它们不是 运行 彼此相继出现,我将不胜感激!
截至目前,我倾向于 Queue.TryPeek 无法正常工作,因为第二个和第三个请求似乎在第一个请求出队之前排队。
所以当我 运行 下面的代码时,我在控制台中看到以下输出。
Queued message: Test 1
Sending message: Test 1
Queued message: Test 2
Sending message: Test 2
Dequeuing message: Test 2
Returning response: Test 2
Queued message: Test 3
Sending message: Test 3
Dequeuing message: Test 1
Returning response: Test 1
Dequeuing message: Test 3
Returning response: Test 3
这是我的 API 控制器方法,它正在获取消息并将该消息排队,一旦消息排队,它将等待直到它在前面看到该请求的消息,然后发送它,然后将其出列.
控制器
[HttpPost]
[Route("message")]
public IActionResult SendMessageUser([FromBody]Message request)
{
Console.WriteLine($"Queued message: {request.Message}");
_messageQueue.QueueAndWaitForTurn(request);
Console.WriteLine($"Sending message: {request.Message}");
var sendMessageResponse = _messageService.SendMessageToUser(request.Name, request.Message);
Console.WriteLine($"Dequeuing message: {request.Message}");
_messageQueue.DequeueMessage(request);
Console.WriteLine($"Returning response: {request.Message}");
return Ok(sendMessageResponse);
}
至于队列,我将它绑定到 IoC,如下所示:
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<IMessageQueue, MessageQueue>();
services.AddScoped<IMessageService, MessageService>();
services.AddMvc();
}
这是我的队列 class 单例,我在这里使用单例,因为我希望在应用程序的整个生命周期中只有一个该队列的实例。
public class MessageQueue : IMessageQueue
{
private Lazy<ConcurrentQueue<Message>> _queue =
new Lazy<ConcurrentQueue<Message>>(new ConcurrentQueue<Message>());
public ConcurrentQueue<Message> Queue
{
get
{
return _queue.Value;
}
}
public void QueueAndWaitForTurn(Message message)
{
Queue.Enqueue(message);
WaitForTurn();
}
public bool DequeueMessage(Message message)
{
var messageIsDequeued = Queue.TryDequeue(out message);
return messageIsDequeued;
}
public void WaitForTurn()
{
Message myMessage = null;
var myMessageIsNext = Queue.TryPeek(out myMessage);
while (!Queue.TryPeek(out myMessage))
{
Thread.Sleep(1000);
WaitForTurn();
}
}
}
我会创建一种 FifoSemaphore:
public class FifoSemaphore : IDisposable
{
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly Queue<TaskCompletionSource<object>> _taskQueue = new Queue<TaskCompletionSource<object>>(10);
private readonly object _lockObject = new object();
public async Task WaitAsync()
{
// Enqueue a task
Task resultTask;
lock (_lockObject)
{
var tcs = new TaskCompletionSource<object>();
resultTask = tcs.Task;
_taskQueue.Enqueue(tcs);
}
// Wait for the lock
await _semaphore.WaitAsync();
// Dequeue the next item and set it to resolved (release back to API call)
TaskCompletionSource<object> queuedItem;
lock (_lockObject)
{
queuedItem = _taskQueue.Dequeue();
}
queuedItem.SetResult(null);
// Await our own task
await resultTask;
}
public void Release()
{
// Release the semaphore so another waiting thread can enter
_semaphore.Release();
}
public void Dispose()
{
_semaphore?.Dispose();
}
}
然后像这样使用它:
[HttpPost]
[Route("message")]
public async Task<IActionResult> SendMessageUser([FromBody]Message request)
{
try
{
await _fifoSemaphore.WaitAsync();
// process message code here
}
finally // important to have a finally to release the semaphore, so that even in the case of an exception, it can continue to process the next message
{
_fifoSemaphore.Release();
}
}
想法是每个等待的项目将首先排队。
接下来,我们等待信号量让我们进入(我们的信号量一次允许一项)。
然后我们将下一个等待项目出列,并将其释放回 API 方法。
最后,我们等待自己在队列中的位置完成,然后return进入API方法。
在API方法中,我们异步等待轮到我们,完成我们的任务,然后return。包含一个 try/finally 以确保即使在失败的情况下也会为后续消息释放信号量。
我遇到了一个问题,我的单例中的 ConcurrentQueue 似乎没有按正确的顺序处理项目。我知道这是 FIFO,所以我在想也许内存中的队列不一样,或者我的 Dequeue 出了什么问题?我对此进行测试的方法是快速向我的 API 端点发出 3 个邮递员请求。如果有人能帮助我理解为什么它们不是 运行 彼此相继出现,我将不胜感激!
截至目前,我倾向于 Queue.TryPeek 无法正常工作,因为第二个和第三个请求似乎在第一个请求出队之前排队。
所以当我 运行 下面的代码时,我在控制台中看到以下输出。
Queued message: Test 1
Sending message: Test 1
Queued message: Test 2
Sending message: Test 2
Dequeuing message: Test 2
Returning response: Test 2
Queued message: Test 3
Sending message: Test 3
Dequeuing message: Test 1
Returning response: Test 1
Dequeuing message: Test 3
Returning response: Test 3
这是我的 API 控制器方法,它正在获取消息并将该消息排队,一旦消息排队,它将等待直到它在前面看到该请求的消息,然后发送它,然后将其出列.
控制器
[HttpPost]
[Route("message")]
public IActionResult SendMessageUser([FromBody]Message request)
{
Console.WriteLine($"Queued message: {request.Message}");
_messageQueue.QueueAndWaitForTurn(request);
Console.WriteLine($"Sending message: {request.Message}");
var sendMessageResponse = _messageService.SendMessageToUser(request.Name, request.Message);
Console.WriteLine($"Dequeuing message: {request.Message}");
_messageQueue.DequeueMessage(request);
Console.WriteLine($"Returning response: {request.Message}");
return Ok(sendMessageResponse);
}
至于队列,我将它绑定到 IoC,如下所示:
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<IMessageQueue, MessageQueue>();
services.AddScoped<IMessageService, MessageService>();
services.AddMvc();
}
这是我的队列 class 单例,我在这里使用单例,因为我希望在应用程序的整个生命周期中只有一个该队列的实例。
public class MessageQueue : IMessageQueue
{
private Lazy<ConcurrentQueue<Message>> _queue =
new Lazy<ConcurrentQueue<Message>>(new ConcurrentQueue<Message>());
public ConcurrentQueue<Message> Queue
{
get
{
return _queue.Value;
}
}
public void QueueAndWaitForTurn(Message message)
{
Queue.Enqueue(message);
WaitForTurn();
}
public bool DequeueMessage(Message message)
{
var messageIsDequeued = Queue.TryDequeue(out message);
return messageIsDequeued;
}
public void WaitForTurn()
{
Message myMessage = null;
var myMessageIsNext = Queue.TryPeek(out myMessage);
while (!Queue.TryPeek(out myMessage))
{
Thread.Sleep(1000);
WaitForTurn();
}
}
}
我会创建一种 FifoSemaphore:
public class FifoSemaphore : IDisposable
{
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly Queue<TaskCompletionSource<object>> _taskQueue = new Queue<TaskCompletionSource<object>>(10);
private readonly object _lockObject = new object();
public async Task WaitAsync()
{
// Enqueue a task
Task resultTask;
lock (_lockObject)
{
var tcs = new TaskCompletionSource<object>();
resultTask = tcs.Task;
_taskQueue.Enqueue(tcs);
}
// Wait for the lock
await _semaphore.WaitAsync();
// Dequeue the next item and set it to resolved (release back to API call)
TaskCompletionSource<object> queuedItem;
lock (_lockObject)
{
queuedItem = _taskQueue.Dequeue();
}
queuedItem.SetResult(null);
// Await our own task
await resultTask;
}
public void Release()
{
// Release the semaphore so another waiting thread can enter
_semaphore.Release();
}
public void Dispose()
{
_semaphore?.Dispose();
}
}
然后像这样使用它:
[HttpPost]
[Route("message")]
public async Task<IActionResult> SendMessageUser([FromBody]Message request)
{
try
{
await _fifoSemaphore.WaitAsync();
// process message code here
}
finally // important to have a finally to release the semaphore, so that even in the case of an exception, it can continue to process the next message
{
_fifoSemaphore.Release();
}
}
想法是每个等待的项目将首先排队。
接下来,我们等待信号量让我们进入(我们的信号量一次允许一项)。
然后我们将下一个等待项目出列,并将其释放回 API 方法。
最后,我们等待自己在队列中的位置完成,然后return进入API方法。
在API方法中,我们异步等待轮到我们,完成我们的任务,然后return。包含一个 try/finally 以确保即使在失败的情况下也会为后续消息释放信号量。