通过串口数据无限producer/consumer
Infinite producer/consumer via serial port data
我目前正在控制台应用程序中通过异步 Task
中的 SerialPort
连接读取数据,理论上它将永远 运行 (总是拾取新的串行数据,因为它进来)。
我有一个单独的 Task
负责从 HashSet
类型中提取串行数据,该类型从我上面的“生产者”任务中填充,然后生成 API 用它来请求。由于“生产者”将永远 运行,我需要“消费者”任务也永远 运行 来处理它。
这是一个人为的例子:
TagItems = new HashSet<Tag>();
Sem = new SemaphoreSlim(1, 1);
SerialPort = new SerialPort("COM3", 115200, Parity.None, 8, StopBits.One);
// serialport settings...
try
{
var producer = StartProducerAsync(cancellationToken);
var consumer = StartConsumerAsync(cancellationToken);
await producer; // this feels weird
await consumer; // this feels weird
}
catch (Exception e)
{
Console.WriteLine(e); // when I manually throw an error in the consumer, this never triggers for some reason
}
生产者/消费者方法如下:
private async Task StartProducerAsync(CancellationToken cancellationToken)
{
using var reader = new StreamReader(SerialPort.BaseStream);
while (SerialPort.IsOpen)
{
var readData = await reader.ReadLineAsync()
.WaitAsync(cancellationToken)
.ConfigureAwait(false);
var tag = new Tag {Data = readData};
await Sem.WaitAsync(cancellationToken);
TagItems.Add(tag);
Sem.Release();
await Task.Delay(100, cancellationToken);
}
reader.Close();
}
private async Task StartConsumerAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
await Sem.WaitAsync(cancellationToken);
if (TagItems.Any())
{
foreach (var item in TagItems)
{
await SendTagAsync(tag, cancellationToken);
}
}
Sem.Release();
await Task.Delay(1000, cancellationToken);
}
}
我认为我的解决方案存在多个问题,但我不太确定如何改进它。例如,我希望我的“数据”是唯一的,所以我使用 HashSet
,但该数据类型不是并发友好的,所以我不得不使用 SemaphoreSlim
锁定,我我猜测可能会出现大量数据流经的性能问题。
我也不确定为什么在 StartConsumerAsync
方法中抛出异常时我的 catch
块永远不会触发。
最后,是否有更好/更现代的模式可以用来以更好的方式解决同样的问题?我注意到 Channels
可能是一个选项,但我见过的很多 producer/consumer 示例都是从生产者开始,它必须“生产”固定数量的项目,而在我的示例中,生产者需要永远活着并可能无限生产。
首先,启动多个异步操作并一个一个等待它们是错误的:
// Wrong
await producer;
await consumer;
原因是如果第一次操作失败,第二次操作就会变成fire-and-forget。允许任务逃脱您的监督并继续 运行 无人值守,只会导致您的程序不稳定。没有什么好结果。
// Correct
await Task.WhenAll(producer, consumer)
现在关于您的主要问题,即如何确保一项任务的失败会导致另一项任务的及时完成。我的建议是将每个任务的失败与一个CancellationTokenSource
的取消挂钩。此外,两个任务都应该监视关联的CancellationToken
,并在收到取消信号后尽快协同完成。
var cts = new CancellationTokenSource();
Task producer = StartProducerAsync(cts.Token).OnErrorCancel(cts);
Task consumer = StartConsumerAsync(cts.Token).OnErrorCancel(cts);
await Task.WhenAll(producer, consumer)
这里是OnErrorCancel
扩展方法:
public static Task OnErrorCancel(this Task task, CancellationTokenSource cts)
{
return task.ContinueWith(t =>
{
if (t.IsFaulted) cts.Cancel();
return t;
}, default, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default).Unwrap();
}
除此之外,您还可以在每个任务中添加一个 all-enclosing try
/catch
块,然后在 [=18= 中调用 cts.Cancel()
].
我目前正在控制台应用程序中通过异步 Task
中的 SerialPort
连接读取数据,理论上它将永远 运行 (总是拾取新的串行数据,因为它进来)。
我有一个单独的 Task
负责从 HashSet
类型中提取串行数据,该类型从我上面的“生产者”任务中填充,然后生成 API 用它来请求。由于“生产者”将永远 运行,我需要“消费者”任务也永远 运行 来处理它。
这是一个人为的例子:
TagItems = new HashSet<Tag>();
Sem = new SemaphoreSlim(1, 1);
SerialPort = new SerialPort("COM3", 115200, Parity.None, 8, StopBits.One);
// serialport settings...
try
{
var producer = StartProducerAsync(cancellationToken);
var consumer = StartConsumerAsync(cancellationToken);
await producer; // this feels weird
await consumer; // this feels weird
}
catch (Exception e)
{
Console.WriteLine(e); // when I manually throw an error in the consumer, this never triggers for some reason
}
生产者/消费者方法如下:
private async Task StartProducerAsync(CancellationToken cancellationToken)
{
using var reader = new StreamReader(SerialPort.BaseStream);
while (SerialPort.IsOpen)
{
var readData = await reader.ReadLineAsync()
.WaitAsync(cancellationToken)
.ConfigureAwait(false);
var tag = new Tag {Data = readData};
await Sem.WaitAsync(cancellationToken);
TagItems.Add(tag);
Sem.Release();
await Task.Delay(100, cancellationToken);
}
reader.Close();
}
private async Task StartConsumerAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
await Sem.WaitAsync(cancellationToken);
if (TagItems.Any())
{
foreach (var item in TagItems)
{
await SendTagAsync(tag, cancellationToken);
}
}
Sem.Release();
await Task.Delay(1000, cancellationToken);
}
}
我认为我的解决方案存在多个问题,但我不太确定如何改进它。例如,我希望我的“数据”是唯一的,所以我使用 HashSet
,但该数据类型不是并发友好的,所以我不得不使用 SemaphoreSlim
锁定,我我猜测可能会出现大量数据流经的性能问题。
我也不确定为什么在 StartConsumerAsync
方法中抛出异常时我的 catch
块永远不会触发。
最后,是否有更好/更现代的模式可以用来以更好的方式解决同样的问题?我注意到 Channels
可能是一个选项,但我见过的很多 producer/consumer 示例都是从生产者开始,它必须“生产”固定数量的项目,而在我的示例中,生产者需要永远活着并可能无限生产。
首先,启动多个异步操作并一个一个等待它们是错误的:
// Wrong
await producer;
await consumer;
原因是如果第一次操作失败,第二次操作就会变成fire-and-forget。允许任务逃脱您的监督并继续 运行 无人值守,只会导致您的程序不稳定。没有什么好结果。
// Correct
await Task.WhenAll(producer, consumer)
现在关于您的主要问题,即如何确保一项任务的失败会导致另一项任务的及时完成。我的建议是将每个任务的失败与一个CancellationTokenSource
的取消挂钩。此外,两个任务都应该监视关联的CancellationToken
,并在收到取消信号后尽快协同完成。
var cts = new CancellationTokenSource();
Task producer = StartProducerAsync(cts.Token).OnErrorCancel(cts);
Task consumer = StartConsumerAsync(cts.Token).OnErrorCancel(cts);
await Task.WhenAll(producer, consumer)
这里是OnErrorCancel
扩展方法:
public static Task OnErrorCancel(this Task task, CancellationTokenSource cts)
{
return task.ContinueWith(t =>
{
if (t.IsFaulted) cts.Cancel();
return t;
}, default, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default).Unwrap();
}
除此之外,您还可以在每个任务中添加一个 all-enclosing try
/catch
块,然后在 [=18= 中调用 cts.Cancel()
].