我怎样才能拥有一个带有 2 个数据源的异步流 return

How can I have an async stream return with 2 data sources

我有以下功能,returns 标准输出数据,作为异步流,来自 运行 a System.Diagnostics.Process。当前方法中的所有内容都按预期工作;我可以在 await foreach() 循环中调用它,我得到的每一行输出都是由外部 exe 生成的。

private static async IAsyncEnumerable<string> ProcessAsyncStream (
    ProcessStartInfo processStartInfo)
{
   // Ensure that process is destroyed when this method exits
   using var process = new Process() { StartInfo = processStartInfo };

   // Buffer used to pass data from event-handler back to this method
   BufferBlock<string> dataBuffer = new BufferBlock<string>();

   process.OutputDataReceived += (s, e) =>
   {
      if (e.Data is null)
      {
         dataBuffer.Complete();
      }
      else
      {
         dataBuffer.Post(e.Data);
      }
   };

   // Start process and redirect output streams
   process.Start();
   process.BeginOutputReadLine();

   // Return data line by line  
   while (await dataBuffer.OutputAvailableAsync())
      yield return dataBuffer.Receive();
}

我的问题是现在我需要它 return 标准输出和标准错误结果。我做了这个 class 来保存来自每个流的数据。

public class ProcessData
{
   public string Error { get; set; } = "";
   public string Output { get; set; } = "";
}

并将 ProcessAsyncStream() 更改为如下所示

private static async IAsyncEnumerable<ProcessData> ProcessAsyncStream (
    ProcessStartInfo processStartInfo)
{
   // Ensure that process is destroyed when this method exits
   using var process = new Process() { StartInfo = processStartInfo };

   // Buffer used to pass data from event-handlers back to this method
   BufferBlock<string> outputDataBuffer = new BufferBlock<string>();
   BufferBlock<string> errorDataBuffer = new BufferBlock<string>();

   
   process.OutputDataReceived += (s, e) =>
   {
      if (e.Data is null)
      {
         outputDataBuffer.Complete();
      }
      else
      {
         outputDataBuffer.Post(e.Data);
      }
   };

   process.ErrorDataReceived += (s, e) =>
   {
      if (e.Data is null)
      {
         errorDataBuffer.Complete();
      }
      else
      {
         errorDataBuffer.Post(e.Data);
      }
   };

   // Start process and redirect output streams
   process.Start();
   process.BeginOutputReadLine();

   // Return data line by line
   while (await outputDataBuffer.OutputAvailableAsync()
          || await errorDataBuffer.OutputAvailableAsync())
      yield return new ProcessData() 
      {
         Error = errorDataBuffer.Receive(), 
         Output = outputDataBuffer.Receive()
      }
}

问题在于,如果其中一个缓冲区先于另一个缓冲区完成,则方法会挂起,因为该缓冲区的 .Receive() 没有任何数据可接收。如果我将 while 条件更改为 && 那么我将不会从另一个缓冲区获取所有数据。

有什么建议吗?

您可以使用 ProcessData 项的单个缓冲区:

var buffer = new BufferBlock<ProcessData>();

然后使用自定义 Complete 机制在两个事件都传播了 null 值时完成缓冲区:

process.OutputDataReceived += (s, e) =>
{
    if (e.Data is null) Complete(1);
        else buffer.Post(new ProcessData() { Output = e.Data });
};

process.ErrorDataReceived += (s, e) =>
{
    if (e.Data is null) Complete(2);
        else buffer.Post(new ProcessData() { Error = e.Data });
};

下面是 Complete 方法的实现:

bool[] completeState = new bool[2];
void Complete(int index)
{
    bool completed;
    lock (completeState.SyncRoot)
    {
        completeState[index - 1] = true;
        completed = completeState.All(v => v);
    }
    if (completed) buffer.Complete();
}

针对实际问题,读取区块的流程有问题。最简单的解决方案是只使用一个 buffer 与多个 producer 和一个 consumer 结合消息包.

您试图用 DataFlow 块 解决的概念问题是 events异步流事件被推送,异步流被拉取。

有几种解决方案可以将它们映射在一起,但我认为最优雅的方法是使用 Unbounded Channel 作为 buffer.

Channels 比 DataFlow 更现代,自由度更小,比 BufferBlock 更简洁,而且非常轻巧且高度优化。此外,我会为不同的响应类型传递一个 wrapper

忽略任何其他问题(概念性或其他方面)。

给定

public enum MessageType
{
   Output,
   Error
}

public class Message
{
   public MessageType MessageType { get; set; }
   public string Data { get; set; }

   public Message(string data, MessageType messageType )
   {
      Data = data;
      MessageType = messageType;
   }
}

用法

private async IAsyncEnumerable<Message> ProcessAsyncStreamAsync(
     ProcessStartInfo processStartInfo, 
     CancellationToken cancellationToken)
{
   using var process = new Process() { StartInfo = processStartInfo };

   var ch = Channel.CreateUnbounded<Message>();
   var completeCount = 0;

   void OnReceived(string data, MessageType type)
   {
      // The Interlocked memory barrier is likely overkill here
      if (data is null && Interlocked.Increment(ref completeCount) == 2)
         ch?.Writer.Complete();
      else
         ch?.Writer.WriteAsync(new Message(data, type), cancellationToken);
   }

   process.OutputDataReceived += (_, args) => OnReceived(args.Data, MessageType.Output);
   process.ErrorDataReceived += (_, args) => OnReceived(args.Data, MessageType.Error);

   // start the process 
   // ...

   await foreach (var message in ch.Reader
           .ReadAllAsync(cancellationToken)
           .ConfigureAwait(false))
      yield return message;

   // cleanup
   // ...
}

注意:完全未经测试

而是在退出时完成。

void HandleData(object sender, DataReceivedEventArgs e)
{
    if (e.Data != null) dataBuffer.Post(e.Data);
}

process.OutputDataReceived += HandleData;
process.ErrorDataReceived += HandleData;
process.Exited += (s,e) => 
{
    process.WaitForExit();
    dataBuffer.Complete();
};