通过串口数据无限producer/consumer

Infinite producer/consumer via serial port data

我目前正在控制台应用程序中通过异步 Task 中的 SerialPort 连接读取数据,理论上它将永远 运行 (总是拾取新的串行数据,因为它进来)。

我有一个单独的 Task 负责从 HashSet 类型中提取串行数据,该类型从我上面的“生产者”任务中填充,然后生成 API 用它来请求。由于“生产者”将永远 运行,我需要“消费者”任务也永远 运行 来处理它。

这是一个人为的例子:

TagItems = new HashSet<Tag>();
Sem = new SemaphoreSlim(1, 1);
SerialPort = new SerialPort("COM3", 115200, Parity.None, 8, StopBits.One);
// serialport settings...

try
{
  var producer = StartProducerAsync(cancellationToken);
  var consumer = StartConsumerAsync(cancellationToken);

  await producer; // this feels weird
  await consumer; // this feels weird
}
catch (Exception e)
{
  Console.WriteLine(e); // when I manually throw an error in the consumer, this never triggers for some reason
}

生产者/消费者方法如下:

private async Task StartProducerAsync(CancellationToken cancellationToken)
{
    using var reader = new StreamReader(SerialPort.BaseStream);
    while (SerialPort.IsOpen)
    {
        var readData = await reader.ReadLineAsync()
            .WaitAsync(cancellationToken)
            .ConfigureAwait(false);

        var tag = new Tag {Data = readData};
        await Sem.WaitAsync(cancellationToken);
        TagItems.Add(tag);
        Sem.Release();

        await Task.Delay(100, cancellationToken);
    }

    reader.Close();
}

private async Task StartConsumerAsync(CancellationToken cancellationToken)
{
    while (!cancellationToken.IsCancellationRequested)
    {
        await Sem.WaitAsync(cancellationToken);
        if (TagItems.Any())
        {
            foreach (var item in TagItems)
            {
                await SendTagAsync(tag, cancellationToken);
            }
        }

        Sem.Release();
        await Task.Delay(1000, cancellationToken);
    }
}

我认为我的解决方案存在多个问题,但我不太确定如何改进它。例如,我希望我的“数据”是唯一的,所以我使用 HashSet,但该数据类型不是并发友好的,所以我不得不使用 SemaphoreSlim 锁定,我我猜测可能会出现大量数据流经的性能问题。

我也不确定为什么在 StartConsumerAsync 方法中抛出异常时我的 catch 块永远不会触发。

最后,是否有更好/更现代的模式可以用来以更好的方式解决同样的问题?我注意到 Channels 可能是一个选项,但我见过的很多 producer/consumer 示例都是从生产者开始,它必须“生产”固定数量的项目,而在我的示例中,生产者需要永远活着并可能无限生产。

首先,启动多个异步操作并一个一个等待它们是错误的:

// Wrong
await producer;
await consumer;

原因是如果第一次操作失败,第二次操作就会变成fire-and-forget。允许任务逃脱您的监督并继续 运行 无人值守,只会导致您的程序不稳定。没有什么好结果。

// Correct
await Task.WhenAll(producer, consumer)

现在关于您的主要问题,即如何确保一项任务的失败会导致另一项任务的及时完成。我的建议是将每个任务的失败与一个CancellationTokenSource的取消挂钩。此外,两个任务都应该监视关联的CancellationToken,并在收到取消信号后尽快协同完成。

var cts = new CancellationTokenSource();
Task producer = StartProducerAsync(cts.Token).OnErrorCancel(cts);
Task consumer = StartConsumerAsync(cts.Token).OnErrorCancel(cts);
await Task.WhenAll(producer, consumer)

这里是OnErrorCancel扩展方法:

public static Task OnErrorCancel(this Task task, CancellationTokenSource cts)
{
    return task.ContinueWith(t =>
    {
        if (t.IsFaulted) cts.Cancel();
        return t;
    }, default, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default).Unwrap();
}

除此之外,您还可以在每个任务中添加一个 all-enclosing try/catch 块,然后在 [=18= 中调用 cts.Cancel() ].