我想使用条件变量来了解消息队列何时不为空,我想在 "HandleMessageQueue" 中将其用作线程

I would like to use Condition Variable in order to know when Messages Queue is not empty, i would like to use it in "HandleMessageQueue" as a thread

我想使用条件变量来了解消息队列何时不为空,我想在"HandleMessageQueue"中将其用作线程

private static Queue<Message> messages = new Queue<Message>();

/// <summary>
/// function return the first message
/// </summary>
/// <returns>first message element</returns>
public static Message GetFirst()
{
  return messages.Dequeue();
}

另一个class:

/// <summary>
/// Function run while the clients connected and handle the queue message
/// </summary>
public static void HandleMessageQueue()
{
   // ...
}

是这样的吗?

 public class EventArgs<T> : EventArgs
{
    private T eventData;


    public EventArgs(T eventData)
    {
        this.eventData = eventData;
    }


    public T EventData
    {
        get { return eventData; }
    }
}
public class ObservableQueue<T>
{
    public event EventHandler<EventArgs<T>> EnQueued;
    public event EventHandler<EventArgs<T>> DeQueued;
    public int Count { get { return queue.Count; } }

    private readonly Queue<T> queue = new Queue<T>();

    protected virtual void OnEnqueued(T item)
    {
        if (EnQueued != null)
            EnQueued(this, new EventArgs<T>(item));
    }

    protected virtual void OnDequeued(T item)
    {
        if (DeQueued != null)
            DeQueued(this, new EventArgs<T>(item));
    }

    public virtual void Enqueue(T item)
    {
        queue.Enqueue(item);
        OnEnqueued(item);
    }

    public virtual T Dequeue()
    {
        var item = queue.Dequeue();
        OnDequeued(item);
        return item;
    }
}

并使用它

   static void Main(string[] args)
    {

        ObservableQueue<string> observableQueue = new ObservableQueue<string>();
        observableQueue.EnQueued += ObservableQueue_EnQueued;
        observableQueue.DeQueued += ObservableQueue_DeQueued;
        observableQueue.Enqueue("abc");
        observableQueue.Dequeue();

        Console.Read();
    }

您可能正在寻找的是一个简单的生产者-消费者模式。在这种情况下,我建议使用 .NET 的 BlockingCollection,它可以让您轻松处理以下情况:

  1. 让一个线程在队列中推送东西
  2. 有另一个线程块直到东西可用
  3. 让整个事情变得容易关闭而不必强行终止线程

这是一个简短的代码示例,请阅读评论以了解有关每一位功能的更多信息:

public class Queue : IDisposable
{
    private readonly Thread _messageThread; // thread for processing messages
    private readonly BlockingCollection<Message> _messages; // queue for messages
    private readonly CancellationTokenSource _cancellation; // used to abort the processing when we're done

    // initializes everything and starts a processing thread
    public Queue()
    {
        _messages = new BlockingCollection<Message>();
        _cancellation = new CancellationTokenSource();

        _messageThread = new Thread(ProcessMessages);
        _messageThread.Start();
    }

    // processing thread function
    private void ProcessMessages()
    {
        try
        {
            while (!_cancellation.IsCancellationRequested)
            {
                // Take() blocks until either:
                // 1) a message is available, in which case it returns it, or
                // 2) the cancellation token is cancelled, in which case it throws an OperationCanceledException

                var message = _messages.Take(_cancellation.Token); 
                // process the message here
            }
        }
        catch (OperationCanceledException)
        {
            // Take() was cancelled, let the thread exit
        }
    }

    // pushes a message
    public void QueueMessage(Message message)
    {
        _messages.Add(message);
    }

    // stops processing and clean up resources
    public void Dispose()
    {
        _cancellation.Cancel(); // let Take() abort by throwing
        _messageThread.Join(); // wait for thread to exit
        _cancellation.Dispose(); // release the cancellation source
        _messages.Dispose(); // release the queue
    }
}

另一种选择是将 ConcurrentQueue<T>ManualResetEvent 组合(事件在 .NET 中大致等同于条件变量),但这需要手动完成 BlockingCollection<T>确实)。