我想使用条件变量来了解消息队列何时不为空,我想在 "HandleMessageQueue" 中将其用作线程
I would like to use Condition Variable in order to know when Messages Queue is not empty, i would like to use it in "HandleMessageQueue" as a thread
我想使用条件变量来了解消息队列何时不为空,我想在"HandleMessageQueue"中将其用作线程
private static Queue<Message> messages = new Queue<Message>();
/// <summary>
/// function return the first message
/// </summary>
/// <returns>first message element</returns>
public static Message GetFirst()
{
return messages.Dequeue();
}
另一个class:
/// <summary>
/// Function run while the clients connected and handle the queue message
/// </summary>
public static void HandleMessageQueue()
{
// ...
}
是这样的吗?
public class EventArgs<T> : EventArgs
{
private T eventData;
public EventArgs(T eventData)
{
this.eventData = eventData;
}
public T EventData
{
get { return eventData; }
}
}
public class ObservableQueue<T>
{
public event EventHandler<EventArgs<T>> EnQueued;
public event EventHandler<EventArgs<T>> DeQueued;
public int Count { get { return queue.Count; } }
private readonly Queue<T> queue = new Queue<T>();
protected virtual void OnEnqueued(T item)
{
if (EnQueued != null)
EnQueued(this, new EventArgs<T>(item));
}
protected virtual void OnDequeued(T item)
{
if (DeQueued != null)
DeQueued(this, new EventArgs<T>(item));
}
public virtual void Enqueue(T item)
{
queue.Enqueue(item);
OnEnqueued(item);
}
public virtual T Dequeue()
{
var item = queue.Dequeue();
OnDequeued(item);
return item;
}
}
并使用它
static void Main(string[] args)
{
ObservableQueue<string> observableQueue = new ObservableQueue<string>();
observableQueue.EnQueued += ObservableQueue_EnQueued;
observableQueue.DeQueued += ObservableQueue_DeQueued;
observableQueue.Enqueue("abc");
observableQueue.Dequeue();
Console.Read();
}
您可能正在寻找的是一个简单的生产者-消费者模式。在这种情况下,我建议使用 .NET 的 BlockingCollection
,它可以让您轻松处理以下情况:
- 让一个线程在队列中推送东西
- 有另一个线程块直到东西可用
- 让整个事情变得容易关闭而不必强行终止线程
这是一个简短的代码示例,请阅读评论以了解有关每一位功能的更多信息:
public class Queue : IDisposable
{
private readonly Thread _messageThread; // thread for processing messages
private readonly BlockingCollection<Message> _messages; // queue for messages
private readonly CancellationTokenSource _cancellation; // used to abort the processing when we're done
// initializes everything and starts a processing thread
public Queue()
{
_messages = new BlockingCollection<Message>();
_cancellation = new CancellationTokenSource();
_messageThread = new Thread(ProcessMessages);
_messageThread.Start();
}
// processing thread function
private void ProcessMessages()
{
try
{
while (!_cancellation.IsCancellationRequested)
{
// Take() blocks until either:
// 1) a message is available, in which case it returns it, or
// 2) the cancellation token is cancelled, in which case it throws an OperationCanceledException
var message = _messages.Take(_cancellation.Token);
// process the message here
}
}
catch (OperationCanceledException)
{
// Take() was cancelled, let the thread exit
}
}
// pushes a message
public void QueueMessage(Message message)
{
_messages.Add(message);
}
// stops processing and clean up resources
public void Dispose()
{
_cancellation.Cancel(); // let Take() abort by throwing
_messageThread.Join(); // wait for thread to exit
_cancellation.Dispose(); // release the cancellation source
_messages.Dispose(); // release the queue
}
}
另一种选择是将 ConcurrentQueue<T>
与 ManualResetEvent
组合(事件在 .NET 中大致等同于条件变量),但这需要手动完成 BlockingCollection<T>
确实)。
我想使用条件变量来了解消息队列何时不为空,我想在"HandleMessageQueue"中将其用作线程
private static Queue<Message> messages = new Queue<Message>();
/// <summary>
/// function return the first message
/// </summary>
/// <returns>first message element</returns>
public static Message GetFirst()
{
return messages.Dequeue();
}
另一个class:
/// <summary>
/// Function run while the clients connected and handle the queue message
/// </summary>
public static void HandleMessageQueue()
{
// ...
}
是这样的吗?
public class EventArgs<T> : EventArgs
{
private T eventData;
public EventArgs(T eventData)
{
this.eventData = eventData;
}
public T EventData
{
get { return eventData; }
}
}
public class ObservableQueue<T>
{
public event EventHandler<EventArgs<T>> EnQueued;
public event EventHandler<EventArgs<T>> DeQueued;
public int Count { get { return queue.Count; } }
private readonly Queue<T> queue = new Queue<T>();
protected virtual void OnEnqueued(T item)
{
if (EnQueued != null)
EnQueued(this, new EventArgs<T>(item));
}
protected virtual void OnDequeued(T item)
{
if (DeQueued != null)
DeQueued(this, new EventArgs<T>(item));
}
public virtual void Enqueue(T item)
{
queue.Enqueue(item);
OnEnqueued(item);
}
public virtual T Dequeue()
{
var item = queue.Dequeue();
OnDequeued(item);
return item;
}
}
并使用它
static void Main(string[] args)
{
ObservableQueue<string> observableQueue = new ObservableQueue<string>();
observableQueue.EnQueued += ObservableQueue_EnQueued;
observableQueue.DeQueued += ObservableQueue_DeQueued;
observableQueue.Enqueue("abc");
observableQueue.Dequeue();
Console.Read();
}
您可能正在寻找的是一个简单的生产者-消费者模式。在这种情况下,我建议使用 .NET 的 BlockingCollection
,它可以让您轻松处理以下情况:
- 让一个线程在队列中推送东西
- 有另一个线程块直到东西可用
- 让整个事情变得容易关闭而不必强行终止线程
这是一个简短的代码示例,请阅读评论以了解有关每一位功能的更多信息:
public class Queue : IDisposable
{
private readonly Thread _messageThread; // thread for processing messages
private readonly BlockingCollection<Message> _messages; // queue for messages
private readonly CancellationTokenSource _cancellation; // used to abort the processing when we're done
// initializes everything and starts a processing thread
public Queue()
{
_messages = new BlockingCollection<Message>();
_cancellation = new CancellationTokenSource();
_messageThread = new Thread(ProcessMessages);
_messageThread.Start();
}
// processing thread function
private void ProcessMessages()
{
try
{
while (!_cancellation.IsCancellationRequested)
{
// Take() blocks until either:
// 1) a message is available, in which case it returns it, or
// 2) the cancellation token is cancelled, in which case it throws an OperationCanceledException
var message = _messages.Take(_cancellation.Token);
// process the message here
}
}
catch (OperationCanceledException)
{
// Take() was cancelled, let the thread exit
}
}
// pushes a message
public void QueueMessage(Message message)
{
_messages.Add(message);
}
// stops processing and clean up resources
public void Dispose()
{
_cancellation.Cancel(); // let Take() abort by throwing
_messageThread.Join(); // wait for thread to exit
_cancellation.Dispose(); // release the cancellation source
_messages.Dispose(); // release the queue
}
}
另一种选择是将 ConcurrentQueue<T>
与 ManualResetEvent
组合(事件在 .NET 中大致等同于条件变量),但这需要手动完成 BlockingCollection<T>
确实)。