保持请求直到消息在 ConcurrentQueue 前面不起作用

Holding requests until a message is in front of ConcurrentQueue not working

我遇到了一个问题,我的单例中的 ConcurrentQueue 似乎没有按正确的顺序处理项目。我知道这是 FIFO,所以我在想也许内存中的队列不一样,或者我的 Dequeue 出了什么问题?我对此进行测试的方法是快速向我的 API 端点发出 3 个邮递员请求。如果有人能帮助我理解为什么它们不是 运行 彼此相继出现,我将不胜感激!

截至目前,我倾向于 Queue.TryPeek 无法正常工作,因为第二个和第三个请求似乎在第一个请求出队之前排队。

所以当我 运行 下面的代码时,我在控制台中看到以下输出。

Queued message: Test 1
Sending message: Test 1
Queued message: Test 2
Sending message: Test 2
Dequeuing message: Test 2
Returning response: Test 2
Queued message: Test 3
Sending message: Test 3
Dequeuing message: Test 1
Returning response: Test 1
Dequeuing message: Test 3
Returning response: Test 3

这是我的 API 控制器方法,它正在获取消息并将该消息排队,一旦消息排队,它将等待直到它在前面看到该请求的消息,然后发送它,然后将其出列.

控制器

[HttpPost]
[Route("message")]
public IActionResult SendMessageUser([FromBody]Message request)
{
    Console.WriteLine($"Queued message: {request.Message}");
    _messageQueue.QueueAndWaitForTurn(request);
    Console.WriteLine($"Sending message: {request.Message}");
    var sendMessageResponse = _messageService.SendMessageToUser(request.Name, request.Message);
    Console.WriteLine($"Dequeuing message: {request.Message}");
    _messageQueue.DequeueMessage(request);
    Console.WriteLine($"Returning response: {request.Message}");
    return Ok(sendMessageResponse);
}

至于队列,我将它绑定到 IoC,如下所示:

public void ConfigureServices(IServiceCollection services)
{
    services.AddSingleton<IMessageQueue, MessageQueue>();
    services.AddScoped<IMessageService, MessageService>();

    services.AddMvc();
}

这是我的队列 class 单例,我在这里使用单例,因为我希望在应用程序的整个生命周期中只有一个该队列的实例。

public class MessageQueue : IMessageQueue
{
    private Lazy<ConcurrentQueue<Message>> _queue = 
        new Lazy<ConcurrentQueue<Message>>(new ConcurrentQueue<Message>());

    public ConcurrentQueue<Message> Queue
    {
        get
        {
            return _queue.Value;
        }
    }

    public void QueueAndWaitForTurn(Message message)
    {
        Queue.Enqueue(message);

        WaitForTurn();
    }

    public bool DequeueMessage(Message message)
    {
        var messageIsDequeued = Queue.TryDequeue(out message);

        return messageIsDequeued;
    }

    public void WaitForTurn()
    {
        Message myMessage = null;
        var myMessageIsNext = Queue.TryPeek(out myMessage);

        while (!Queue.TryPeek(out myMessage))
        {
            Thread.Sleep(1000);
            WaitForTurn();
        }
    }
}

我会创建一种 FifoSemaphore:

public class FifoSemaphore : IDisposable
{
    private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
    private readonly Queue<TaskCompletionSource<object>> _taskQueue = new Queue<TaskCompletionSource<object>>(10);
    private readonly object _lockObject = new object();

    public async Task WaitAsync()
    {
        // Enqueue a task
        Task resultTask;
        lock (_lockObject)
        {
            var tcs = new TaskCompletionSource<object>();
            resultTask = tcs.Task;
            _taskQueue.Enqueue(tcs);
        }

        // Wait for the lock
        await _semaphore.WaitAsync();

        // Dequeue the next item and set it to resolved (release back to API call)
        TaskCompletionSource<object> queuedItem;
        lock (_lockObject)
        {
            queuedItem = _taskQueue.Dequeue();
        }
        queuedItem.SetResult(null);

        // Await our own task
        await resultTask;
    }

    public void Release()
    {
        // Release the semaphore so another waiting thread can enter
        _semaphore.Release();
    }

    public void Dispose()
    {
        _semaphore?.Dispose();
    }
}

然后像这样使用它:

[HttpPost]
[Route("message")]
public async Task<IActionResult> SendMessageUser([FromBody]Message request)
{
    try
    {
        await _fifoSemaphore.WaitAsync();
        // process message code here
    }
    finally // important to have a finally to release the semaphore, so that even in the case of an exception, it can continue to process the next message
    {
        _fifoSemaphore.Release();
    }
}

想法是每个等待的项目将首先排队。

接下来,我们等待信号量让我们进入(我们的信号量一次允许一项)。

然后我们将下一个等待项目出列,并将其释放回 API 方法。

最后,我们等待自己在队列中的位置完成,然后return进入API方法。

在API方法中,我们异步等待轮到我们,完成我们的任务,然后return。包含一个 try/finally 以确保即使在失败的情况下也会为后续消息释放信号量。