C# 在 Producer/Consumer 中等待多个事件
C# Await Multiple Events in Producer/Consumer
我正在使用 producer/consumer pattern 实现数据 link 层。数据 link 层有自己的线程和状态机来通过线路(以太网、RS-232...)传输数据 link 协议。物理层的接口表示为 System.IO.Stream。另一个线程将消息写入数据 link 对象并从中读取消息。
数据 link 对象处于空闲状态,必须等待以下四个条件之一:
- 收到一个字节
- 网络线程中有一条消息可用
- 保活定时器已过期
- 所有通信被网络层取消
我很难找出最好的方法来做到这一点,而不会将通信分成 read/write 线程(从而显着增加复杂性)。以下是我如何从 4 中得到 3:
// Read a byte from 'stream'. Timeout after 10 sec. Monitor the cancellation token.
stream.ReadTimeout = 10000;
await stream.ReadAsync(buf, 0, 1, cts.Token);
或
BlockingCollection<byte[]> SendQueue = new ...;
...
// Check for a message from network layer. Timeout after 10 seconds.
// Monitor cancellation token.
SendQueue.TryTake(out msg, 10000, cts.Token);
我应该怎么做才能阻塞线程,等待所有四个条件?欢迎所有建议。我不喜欢任何架构或数据结构。
编辑:******** 感谢大家的帮助。这是我的解决方案 *********
首先,我认为 producer/consumer 队列没有异步实现。所以我实现了类似于 this Whosebug post.
的东西
我需要一个外部和内部取消源来分别停止消费者线程和取消中间任务,similar to this article。
byte[] buf = new byte[1];
using (CancellationTokenSource internalTokenSource = new CancellationTokenSource())
{
CancellationToken internalToken = internalTokenSource.Token;
CancellationToken stopToken = stopTokenSource.Token;
using (CancellationTokenSource linkedCts =
CancellationTokenSource.CreateLinkedTokenSource(stopToken, internalToken))
{
CancellationToken ct = linkedCts.Token;
Task<int> readTask = m_stream.ReadAsync(buf, 0, 1, ct);
Task<byte[]> msgTask = m_sendQueue.DequeueAsync(ct);
Task keepAliveTask = Task.Delay(m_keepAliveTime, ct);
// Wait for at least one task to complete
await Task.WhenAny(readTask, msgTask, keepAliveTask);
// Next cancel the other tasks
internalTokenSource.Cancel();
try {
await Task.WhenAll(readTask, msgTask, keepAliveTask);
} catch (OperationCanceledException e) {
if (e.CancellationToken == stopToken)
throw;
}
if (msgTask.IsCompleted)
// Send the network layer message
else if (readTask.IsCompleted)
// Process the byte from the physical layer
else
Contract.Assert(keepAliveTask.IsCompleted);
// Send a keep alive message
}
}
取消读取会让您无法知道数据是否已读取。取消和阅读彼此不是原子的。该方法仅在取消后关闭流时才有效。
队列方法更好。您可以创建一个 linked CancellationTokenSource
,它会在您需要时随时取消。您传递的不是 cts.Token
,而是您控制的令牌。
然后您可以根据时间、另一个令牌和您喜欢的任何其他事件向该令牌发出信号。如果您使用内置超时,队列将在内部对超时的传入令牌执行相同的操作link。
我会选择你的选项二,等待 4 个条件中的任何一个发生。假设您已经将 4 个任务作为可等待的方法:
var task1 = WaitForByteReceivedAsync();
var task2 = WaitForMessageAvailableAsync();
var task3 = WaitForKeepAliveTimerAsync();
var task4 = WaitForCommunicationCancelledAsync();
// now gather them
IEnumerable<Task<bool>> theTasks = new List<IEnumerable<Task<bool>>>{
task1, task2, task3, task4
};
// Wait for any of the things to complete
var result = await Task.WhenAny(theTasks);
上面的代码将在第一个任务完成后立即恢复,并忽略其他 3 个。
注:
在the documentation for WhenAny中说:
返回的任务将始终以 RanToCompletion 状态结束,其结果设置为要完成的第一个任务。即使要完成的第一个任务以已取消或已故障状态结束也是如此。
因此,在相信所发生的事情之前,请务必进行最终检查:
if(result.Result.Result == true)
... // First Result is the task, the second is the bool that the task returns
在这种情况下,我只会使用取消令牌进行取消。像保持活动计时器这样的重复超时最好表示为计时器。
因此,我会将其建模为三个可取消的任务。一、注销令牌:
All communication was cancelled by the network layer
CancellationToken token = ...;
然后,三个并发操作:
A byte is received
var readByteTask = stream.ReadAsync(buf, 0, 1, token);
The keep-alive timer has expired
var keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10), token);
A message is available from the network thread
这个有点棘手。您当前的代码使用不异步兼容的 BlockingCollection<T>
。我建议切换到 TPL Dataflow's BufferBlock<T>
or my own AsyncProducerConsumerQueue<T>
,其中任何一个都可以用作异步兼容 producer/consumer 队列(意味着生产者可以同步或异步,消费者可以同步或异步)。
BufferBlock<byte[]> SendQueue = new ...;
...
var messageTask = SendQueue.ReceiveAsync(token);
然后您可以使用Task.WhenAny
来确定这些任务中的哪些已完成:
var completedTask = await Task.WhenAny(readByteTask, keepAliveTimerTask, messageTask);
现在,您可以通过将 completedTask
与其他人进行比较并 await
对它们进行比较来检索结果:
if (completedTask == readByteTask)
{
// Throw an exception if there was a read error or cancellation.
await readByteTask;
var byte = buf[0];
...
// Continue reading
readByteTask = stream.ReadAsync(buf, 0, 1, token);
}
else if (completedTask == keepAliveTimerTask)
{
// Throw an exception if there was a cancellation.
await keepAliveTimerTask;
...
// Restart keepalive timer.
keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10), token);
}
else if (completedTask == messageTask)
{
// Throw an exception if there was a cancellation (or the SendQueue was marked as completed)
byte[] message = await messageTask;
...
// Continue reading
messageTask = SendQueue.ReceiveAsync(token);
}
我正在使用 producer/consumer pattern 实现数据 link 层。数据 link 层有自己的线程和状态机来通过线路(以太网、RS-232...)传输数据 link 协议。物理层的接口表示为 System.IO.Stream。另一个线程将消息写入数据 link 对象并从中读取消息。
数据 link 对象处于空闲状态,必须等待以下四个条件之一:
- 收到一个字节
- 网络线程中有一条消息可用
- 保活定时器已过期
- 所有通信被网络层取消
我很难找出最好的方法来做到这一点,而不会将通信分成 read/write 线程(从而显着增加复杂性)。以下是我如何从 4 中得到 3:
// Read a byte from 'stream'. Timeout after 10 sec. Monitor the cancellation token.
stream.ReadTimeout = 10000;
await stream.ReadAsync(buf, 0, 1, cts.Token);
或
BlockingCollection<byte[]> SendQueue = new ...;
...
// Check for a message from network layer. Timeout after 10 seconds.
// Monitor cancellation token.
SendQueue.TryTake(out msg, 10000, cts.Token);
我应该怎么做才能阻塞线程,等待所有四个条件?欢迎所有建议。我不喜欢任何架构或数据结构。
编辑:******** 感谢大家的帮助。这是我的解决方案 *********
首先,我认为 producer/consumer 队列没有异步实现。所以我实现了类似于 this Whosebug post.
的东西我需要一个外部和内部取消源来分别停止消费者线程和取消中间任务,similar to this article。
byte[] buf = new byte[1];
using (CancellationTokenSource internalTokenSource = new CancellationTokenSource())
{
CancellationToken internalToken = internalTokenSource.Token;
CancellationToken stopToken = stopTokenSource.Token;
using (CancellationTokenSource linkedCts =
CancellationTokenSource.CreateLinkedTokenSource(stopToken, internalToken))
{
CancellationToken ct = linkedCts.Token;
Task<int> readTask = m_stream.ReadAsync(buf, 0, 1, ct);
Task<byte[]> msgTask = m_sendQueue.DequeueAsync(ct);
Task keepAliveTask = Task.Delay(m_keepAliveTime, ct);
// Wait for at least one task to complete
await Task.WhenAny(readTask, msgTask, keepAliveTask);
// Next cancel the other tasks
internalTokenSource.Cancel();
try {
await Task.WhenAll(readTask, msgTask, keepAliveTask);
} catch (OperationCanceledException e) {
if (e.CancellationToken == stopToken)
throw;
}
if (msgTask.IsCompleted)
// Send the network layer message
else if (readTask.IsCompleted)
// Process the byte from the physical layer
else
Contract.Assert(keepAliveTask.IsCompleted);
// Send a keep alive message
}
}
取消读取会让您无法知道数据是否已读取。取消和阅读彼此不是原子的。该方法仅在取消后关闭流时才有效。
队列方法更好。您可以创建一个 linked CancellationTokenSource
,它会在您需要时随时取消。您传递的不是 cts.Token
,而是您控制的令牌。
然后您可以根据时间、另一个令牌和您喜欢的任何其他事件向该令牌发出信号。如果您使用内置超时,队列将在内部对超时的传入令牌执行相同的操作link。
我会选择你的选项二,等待 4 个条件中的任何一个发生。假设您已经将 4 个任务作为可等待的方法:
var task1 = WaitForByteReceivedAsync();
var task2 = WaitForMessageAvailableAsync();
var task3 = WaitForKeepAliveTimerAsync();
var task4 = WaitForCommunicationCancelledAsync();
// now gather them
IEnumerable<Task<bool>> theTasks = new List<IEnumerable<Task<bool>>>{
task1, task2, task3, task4
};
// Wait for any of the things to complete
var result = await Task.WhenAny(theTasks);
上面的代码将在第一个任务完成后立即恢复,并忽略其他 3 个。
注:
在the documentation for WhenAny中说:
返回的任务将始终以 RanToCompletion 状态结束,其结果设置为要完成的第一个任务。即使要完成的第一个任务以已取消或已故障状态结束也是如此。
因此,在相信所发生的事情之前,请务必进行最终检查:
if(result.Result.Result == true)
... // First Result is the task, the second is the bool that the task returns
在这种情况下,我只会使用取消令牌进行取消。像保持活动计时器这样的重复超时最好表示为计时器。
因此,我会将其建模为三个可取消的任务。一、注销令牌:
All communication was cancelled by the network layer
CancellationToken token = ...;
然后,三个并发操作:
A byte is received
var readByteTask = stream.ReadAsync(buf, 0, 1, token);
The keep-alive timer has expired
var keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10), token);
A message is available from the network thread
这个有点棘手。您当前的代码使用不异步兼容的 BlockingCollection<T>
。我建议切换到 TPL Dataflow's BufferBlock<T>
or my own AsyncProducerConsumerQueue<T>
,其中任何一个都可以用作异步兼容 producer/consumer 队列(意味着生产者可以同步或异步,消费者可以同步或异步)。
BufferBlock<byte[]> SendQueue = new ...;
...
var messageTask = SendQueue.ReceiveAsync(token);
然后您可以使用Task.WhenAny
来确定这些任务中的哪些已完成:
var completedTask = await Task.WhenAny(readByteTask, keepAliveTimerTask, messageTask);
现在,您可以通过将 completedTask
与其他人进行比较并 await
对它们进行比较来检索结果:
if (completedTask == readByteTask)
{
// Throw an exception if there was a read error or cancellation.
await readByteTask;
var byte = buf[0];
...
// Continue reading
readByteTask = stream.ReadAsync(buf, 0, 1, token);
}
else if (completedTask == keepAliveTimerTask)
{
// Throw an exception if there was a cancellation.
await keepAliveTimerTask;
...
// Restart keepalive timer.
keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10), token);
}
else if (completedTask == messageTask)
{
// Throw an exception if there was a cancellation (or the SendQueue was marked as completed)
byte[] message = await messageTask;
...
// Continue reading
messageTask = SendQueue.ReceiveAsync(token);
}