使用线程和 EventWaitHandle 的生产者/消费者模式
Producer/ Consumer pattern using threads and EventWaitHandle
我猜这是一种代码审查,但这是我对生产者/消费者模式的实现。我想知道的是 ReceivingThread()
或 SendingThread()
方法中的 while 循环是否会停止执行。请注意,EnqueueSend(DataSendEnqeueInfo info)
是从多个不同的线程调用的,我可能无法在此处使用任务,因为我肯定必须在单独的线程中使用命令。
private Thread mReceivingThread;
private Thread mSendingThread;
private Queue<DataRecievedEnqeueInfo> mReceivingThreadQueue;
private Queue<DataSendEnqeueInfo> mSendingThreadQueue;
private readonly object mReceivingQueueLock = new object();
private readonly object mSendingQueueLock = new object();
private bool mIsRunning;
EventWaitHandle mRcWaitHandle;
EventWaitHandle mSeWaitHandle;
private void ReceivingThread()
{
while (mIsRunning)
{
mRcWaitHandle.WaitOne();
DataRecievedEnqeueInfo item = null;
while (mReceivingThreadQueue.Count > 0)
{
lock (mReceivingQueueLock)
{
item = mReceivingThreadQueue.Dequeue();
}
ProcessReceivingItem(item);
}
mRcWaitHandle.Reset();
}
}
private void SendingThread()
{
while (mIsRunning)
{
mSeWaitHandle.WaitOne();
while (mSendingThreadQueue.Count > 0)
{
DataSendEnqeueInfo item = null;
lock (mSendingQueueLock)
{
item = mSendingThreadQueue.Dequeue();
}
ProcessSendingItem(item);
}
mSeWaitHandle.Reset();
}
}
internal void EnqueueRecevingData(DataRecievedEnqeueInfo info)
{
lock (mReceivingQueueLock)
{
mReceivingThreadQueue.Enqueue(info);
mRcWaitHandle.Set();
}
}
public void EnqueueSend(DataSendEnqeueInfo info)
{
lock (mSendingQueueLock)
{
mSendingThreadQueue.Enqueue(info);
mSeWaitHandle.Set();
}
}
P.S 这里的想法是,当队列为空时,我使用 WaitHandle
s 让线程进入休眠状态,并在新项目入队时发出信号让它们启动。
更新
对于可能尝试使用 TPL 或任务实施 Producer/Consumer 模式的人,我将离开这个 https://blogs.msdn.microsoft.com/benwilli/2015/09/10/tasks-are-still-not-threads-and-async-is-not-parallel/。
使用 BlockingCollection 代替 Queue、EventWaitHandle 和锁定对象:
public class DataInfo { }
private Thread mReceivingThread;
private Thread mSendingThread;
private BlockingCollection<DataInfo> queue;
private CancellationTokenSource receivingCts = new CancellationTokenSource();
private void ReceivingThread()
{
try
{
while (!receivingCts.IsCancellationRequested)
{
// This will block until an item is added to the queue or the cancellation token is cancelled
DataInfo item = queue.Take(receivingCts.Token);
ProcessReceivingItem(item);
}
}
catch (OperationCanceledException)
{
}
}
internal void EnqueueRecevingData(DataInfo info)
{
// When a new item is produced, just add it to the queue
queue.Add(info);
}
// To cancel the receiving thread, cancel the token
private void CancelReceivingThread()
{
receivingCts.Cancel();
}
就个人而言,对于简单的 producer-consumer 问题,我只会使用 BlockingCollection。无需手动编写您自己的同步逻辑代码。如果队列中没有项目,消费线程也会阻塞。
如果您使用此 class:
,您的代码可能如下所示
private BlockingCollection<DataRecievedEnqeueInfo> mReceivingThreadQueue = new BlockingCollection<DataRecievedEnqeueInfo>();
private BlockingCollection<DataSendEnqeueInfo> mSendingThreadQueue = new BlockingCollection<DataSendEnqeueInfo>();
public void Stop()
{
// No need for mIsRunning. Makes the enumerables in the GetConsumingEnumerable() calls
// below to complete.
mReceivingThreadQueue.CompleteAdding();
mSendingThreadQueue.CompleteAdding();
}
private void ReceivingThread()
{
foreach (DataRecievedEnqeueInfo item in mReceivingThreadQueue.GetConsumingEnumerable())
{
ProcessReceivingItem(item);
}
}
private void SendingThread()
{
foreach (DataSendEnqeueInfo item in mSendingThreadQueue.GetConsumingEnumerable())
{
ProcessSendingItem(item);
}
}
internal void EnqueueRecevingData(DataRecievedEnqeueInfo info)
{
// You can also use TryAdd() if there is a possibility that you
// can add items after you have stopped. Otherwise, this can throw an
// an exception after CompleteAdding() has been called.
mReceivingThreadQueue.Add(info);
}
public void EnqueueSend(DataSendEnqeueInfo info)
{
mSendingThreadQueue.Add(info);
}
如评论中所建议,您也可以尝试使用 TPL Dataflow 块。
据我所知,您有两个类似的管道,用于接收和发送,所以我假设您的 class 层次结构是这样的:
class EnqueueInfo { }
class DataRecievedEnqeueInfo : EnqueueInfo { }
class DataSendEnqeueInfo : EnqueueInfo { }
我们可以 assemble 一个抽象 class 来封装创建管道的逻辑,并提供处理项目的接口,如下所示:
abstract class EnqueueInfoProcessor<T>
where T : EnqueueInfo
{
// here we will store all the messages received before the handling
private readonly BufferBlock<T> _buffer;
// simple action block for actual handling the items
private ActionBlock<T> _action;
// cancellation token to cancel the pipeline
public EnqueueInfoProcessor(CancellationToken token)
{
_buffer = new BufferBlock<T>(new DataflowBlockOptions { CancellationToken = token });
_action = new ActionBlock<T>(item => ProcessItem(item), new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount,
CancellationToken = token
});
// we are linking two blocks so all the items from buffer
// will flow down to action block in order they've been received
_buffer.LinkTo(_action, new DataflowLinkOptions { PropagateCompletion = true });
}
public void PostItem(T item)
{
// synchronously wait for posting to complete
_buffer.Post(item);
}
public async Task SendItemAsync(T item)
{
// asynchronously wait for message to be posted
await _buffer.SendAsync(item);
}
// abstract method to implement
protected abstract void ProcessItem(T item);
}
注意,你也可以使用Encapsulate<TInput, TOutput>
方法将link封装在两个block之间,但那样的话你必须妥善处理buffer block的Completion
,如果你正在使用它。
在此之后,我们只需要实现接收和发送句柄逻辑的两个方法:
public class SendEnqueueInfoProcessor : EnqueueInfoProcessor<DataSendEnqeueInfo>
{
SendEnqueueInfoProcessor(CancellationToken token)
: base(token)
{
}
protected override void ProcessItem(DataSendEnqeueInfo item)
{
// send logic here
}
}
public class RecievedEnqueueInfoProcessor : EnqueueInfoProcessor<DataRecievedEnqeueInfo>
{
RecievedEnqueueInfoProcessor(CancellationToken token)
: base(token)
{
}
protected override void ProcessItem(DataRecievedEnqeueInfo item)
{
// recieve logic here
}
}
您还可以使用 TransformBlock<DataRecievedEnqeueInfo, DataSendEnqeueInfo>
创建更复杂的管道,如果您的消息流是关于 ReceiveInfo
消息变成 SendInfo
。
我猜这是一种代码审查,但这是我对生产者/消费者模式的实现。我想知道的是 ReceivingThread()
或 SendingThread()
方法中的 while 循环是否会停止执行。请注意,EnqueueSend(DataSendEnqeueInfo info)
是从多个不同的线程调用的,我可能无法在此处使用任务,因为我肯定必须在单独的线程中使用命令。
private Thread mReceivingThread;
private Thread mSendingThread;
private Queue<DataRecievedEnqeueInfo> mReceivingThreadQueue;
private Queue<DataSendEnqeueInfo> mSendingThreadQueue;
private readonly object mReceivingQueueLock = new object();
private readonly object mSendingQueueLock = new object();
private bool mIsRunning;
EventWaitHandle mRcWaitHandle;
EventWaitHandle mSeWaitHandle;
private void ReceivingThread()
{
while (mIsRunning)
{
mRcWaitHandle.WaitOne();
DataRecievedEnqeueInfo item = null;
while (mReceivingThreadQueue.Count > 0)
{
lock (mReceivingQueueLock)
{
item = mReceivingThreadQueue.Dequeue();
}
ProcessReceivingItem(item);
}
mRcWaitHandle.Reset();
}
}
private void SendingThread()
{
while (mIsRunning)
{
mSeWaitHandle.WaitOne();
while (mSendingThreadQueue.Count > 0)
{
DataSendEnqeueInfo item = null;
lock (mSendingQueueLock)
{
item = mSendingThreadQueue.Dequeue();
}
ProcessSendingItem(item);
}
mSeWaitHandle.Reset();
}
}
internal void EnqueueRecevingData(DataRecievedEnqeueInfo info)
{
lock (mReceivingQueueLock)
{
mReceivingThreadQueue.Enqueue(info);
mRcWaitHandle.Set();
}
}
public void EnqueueSend(DataSendEnqeueInfo info)
{
lock (mSendingQueueLock)
{
mSendingThreadQueue.Enqueue(info);
mSeWaitHandle.Set();
}
}
P.S 这里的想法是,当队列为空时,我使用 WaitHandle
s 让线程进入休眠状态,并在新项目入队时发出信号让它们启动。
更新 对于可能尝试使用 TPL 或任务实施 Producer/Consumer 模式的人,我将离开这个 https://blogs.msdn.microsoft.com/benwilli/2015/09/10/tasks-are-still-not-threads-and-async-is-not-parallel/。
使用 BlockingCollection 代替 Queue、EventWaitHandle 和锁定对象:
public class DataInfo { }
private Thread mReceivingThread;
private Thread mSendingThread;
private BlockingCollection<DataInfo> queue;
private CancellationTokenSource receivingCts = new CancellationTokenSource();
private void ReceivingThread()
{
try
{
while (!receivingCts.IsCancellationRequested)
{
// This will block until an item is added to the queue or the cancellation token is cancelled
DataInfo item = queue.Take(receivingCts.Token);
ProcessReceivingItem(item);
}
}
catch (OperationCanceledException)
{
}
}
internal void EnqueueRecevingData(DataInfo info)
{
// When a new item is produced, just add it to the queue
queue.Add(info);
}
// To cancel the receiving thread, cancel the token
private void CancelReceivingThread()
{
receivingCts.Cancel();
}
就个人而言,对于简单的 producer-consumer 问题,我只会使用 BlockingCollection。无需手动编写您自己的同步逻辑代码。如果队列中没有项目,消费线程也会阻塞。
如果您使用此 class:
,您的代码可能如下所示private BlockingCollection<DataRecievedEnqeueInfo> mReceivingThreadQueue = new BlockingCollection<DataRecievedEnqeueInfo>();
private BlockingCollection<DataSendEnqeueInfo> mSendingThreadQueue = new BlockingCollection<DataSendEnqeueInfo>();
public void Stop()
{
// No need for mIsRunning. Makes the enumerables in the GetConsumingEnumerable() calls
// below to complete.
mReceivingThreadQueue.CompleteAdding();
mSendingThreadQueue.CompleteAdding();
}
private void ReceivingThread()
{
foreach (DataRecievedEnqeueInfo item in mReceivingThreadQueue.GetConsumingEnumerable())
{
ProcessReceivingItem(item);
}
}
private void SendingThread()
{
foreach (DataSendEnqeueInfo item in mSendingThreadQueue.GetConsumingEnumerable())
{
ProcessSendingItem(item);
}
}
internal void EnqueueRecevingData(DataRecievedEnqeueInfo info)
{
// You can also use TryAdd() if there is a possibility that you
// can add items after you have stopped. Otherwise, this can throw an
// an exception after CompleteAdding() has been called.
mReceivingThreadQueue.Add(info);
}
public void EnqueueSend(DataSendEnqeueInfo info)
{
mSendingThreadQueue.Add(info);
}
如评论中所建议,您也可以尝试使用 TPL Dataflow 块。
据我所知,您有两个类似的管道,用于接收和发送,所以我假设您的 class 层次结构是这样的:
class EnqueueInfo { }
class DataRecievedEnqeueInfo : EnqueueInfo { }
class DataSendEnqeueInfo : EnqueueInfo { }
我们可以 assemble 一个抽象 class 来封装创建管道的逻辑,并提供处理项目的接口,如下所示:
abstract class EnqueueInfoProcessor<T>
where T : EnqueueInfo
{
// here we will store all the messages received before the handling
private readonly BufferBlock<T> _buffer;
// simple action block for actual handling the items
private ActionBlock<T> _action;
// cancellation token to cancel the pipeline
public EnqueueInfoProcessor(CancellationToken token)
{
_buffer = new BufferBlock<T>(new DataflowBlockOptions { CancellationToken = token });
_action = new ActionBlock<T>(item => ProcessItem(item), new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount,
CancellationToken = token
});
// we are linking two blocks so all the items from buffer
// will flow down to action block in order they've been received
_buffer.LinkTo(_action, new DataflowLinkOptions { PropagateCompletion = true });
}
public void PostItem(T item)
{
// synchronously wait for posting to complete
_buffer.Post(item);
}
public async Task SendItemAsync(T item)
{
// asynchronously wait for message to be posted
await _buffer.SendAsync(item);
}
// abstract method to implement
protected abstract void ProcessItem(T item);
}
注意,你也可以使用Encapsulate<TInput, TOutput>
方法将link封装在两个block之间,但那样的话你必须妥善处理buffer block的Completion
,如果你正在使用它。
在此之后,我们只需要实现接收和发送句柄逻辑的两个方法:
public class SendEnqueueInfoProcessor : EnqueueInfoProcessor<DataSendEnqeueInfo>
{
SendEnqueueInfoProcessor(CancellationToken token)
: base(token)
{
}
protected override void ProcessItem(DataSendEnqeueInfo item)
{
// send logic here
}
}
public class RecievedEnqueueInfoProcessor : EnqueueInfoProcessor<DataRecievedEnqeueInfo>
{
RecievedEnqueueInfoProcessor(CancellationToken token)
: base(token)
{
}
protected override void ProcessItem(DataRecievedEnqeueInfo item)
{
// recieve logic here
}
}
您还可以使用 TransformBlock<DataRecievedEnqeueInfo, DataSendEnqeueInfo>
创建更复杂的管道,如果您的消息流是关于 ReceiveInfo
消息变成 SendInfo
。