如何在 API 中向客户端代码公开 class 实现生产者消费者管道?

How to expose a class implementing producer consumer pipeline to client code in an API?

我对如何向客户端代码公开实现生产者消费者管道的 class 感到有点困惑。

假设我有两个 classes 代表生产者和消费者,像这样:

public class Consumer
{
  ...

  public void Cosume();

  ...
}

那么制作人:

public class Producer
{
  ...

  public void Produce();

  ...
}

第三个 class 协调生产者和消费者(顺便说一句,问题来了)

public class ProducerConsumer
{
  ...

  private Producer producer;
  private Consumer consumer;

  ...

  public void Start()
  {
     ...
  }
}

我应该如何开始?

我想调用 producer.Produce() 和 consumer.Consume() 包裹在 Task.Run 中等待它们完成,就像这样:

public async Task Start()
{
     await Task.WhenAll(Task.Run(() => producer.Produce(), Task.Run(() => consumer.Consume());
}

但我读到在实现中使用 Task.Run() 并不是一个很好的做法,所以我放弃了它。

我也想到了 Parallel.Invoke(),将 Parallel.Invoke() 的阻塞等待卸载到另一个线程的责任留给客户端代码,如下所示:

public void Start()
{
      Parallel.Invoke(() => producer.Produce(), () => consumer.Consume());
}

客户端代码会做类似的事情:

public async void ButtonHandler(object sender, RoutedEventArgs e)
{
   await Task.Run(() => producerConsumer.Start());
}

但是卸载到另一个线程只是为了等待另外两个线程完成对我来说有点奇怪,因为我只是在浪费一个线程来等待一些事情。

阅读 Whosebug 上的其他问题后,许多人建议将如何调用并行代码的责任留给调用代码,所以我想从 ProducerConsumer class 中公开 Producer 和 Consumer 并让客户端代码以它想要的方式调用 Producer.Produce() 和 Consumer.Consume(),或多或少像:

public async void ButtonHandler(object sender, RoutedEventArgs e)
{
   Task producerTask = Task.Run(() => producerConsumer.Producer.Produce());
   Task consumerTask = Task.Run(() => producerConsumer.Consumer.Consumer());

   await Task.WhenAll(producerTask, consumerTask);
}

但最后一个实现对我来说看起来很尴尬,因为调用者代码负责调用 Produce() 和 Consume(),如果省略这两个方法之一,可能会导致错误。

我知道 TPL.DataFlow 可以实现生产者消费者管道,但我无法将外部依赖项添加到库中。

那么Start()方法应该怎么写呢?

一般情况下:我应该如何将本质上并行的代码公开给库客户端代码?

如果您有一个 async 兼容的 producer/consumer 队列(例如,来自 TPL Dataflow 的 BufferBlock<T>),并且您的生产者受到限制(例如,队列具有合理的最大数量的元素,或者生产者的数据来自一些 I/O 操作),那么你可以让你的生产者和消费者 async 并像这样直接调用它们:

public async Task ExecuteAsync()
{
  await Task.WhenAll(producer.ProduceAsync(), consumer.ConsumeAsync())
      .ConfigureAwait(false);
}

否则,您将不得不"give"某处。如果您的生产者和消费者是同步的,那么根据定义,您的包含 class 控制两个线程。在这种情况下,可以使用 Task.Run:

public async Task ExecuteAsync()
{
  await Task.WhenAll(Task.Run(() => producer.Produce()), Task.Run(() => consumer.Consume()))
      .ConfigureAwait(false);
}