有没有办法在 IAsyncEnumerator 中使用 System.Diagnostics.Process?

Is there a way to use System.Diagnostics.Process in an IAsyncEnumerator?

首先,我的目标是 .Net Core 3.1 和 C#8。

我想要这样的东西。

public static async Task<MyDataObj> GetData()
{
  var dataObj = new MyDataObj();
  var args = ArgsHelperFunction(...);
  
  await foreach(string result in RunProcessAsync(args))
  {
     // Process result and store it in dataObj
  }

  return dataObj;
}

private static async IAsyncEnumerable<string> RunProcessAsync(string args)
{
  await using (var myProcess = new Process())
  {
    myProcess.StartInfo.FileName = @"path\to\file.exe"
    myProcess.StartInfo.Arguments = args;
    myProcess.StartInfo.UseShellExecute = false;
    myProcess.StartInfo.RedirectStandardError = true;
    myProcess.StartInfo.CreateNoWindow = true;

    myProcess.ErrorDataReceived += (s, e) =>
    {
      yield return e.Data;
    }

    myProcess.Start();
    myProcess.BeginErrorReadLine();
    process.WaitforExit();
  }
}

当我尝试此设置时,我收到来自 await foreach(string result in RunProcessAsync(args))

的错误

CS8417 'Process': type used in an asynchronous using statement must be implicitly convertible to 'System.IAsyncDisposable' or implement a suitable 'DisposeAsync' method.

这个错误来自 yield return e.Data;

CS1621 The yield statement cannot be used inside an anonymous method or lambda expression

目标就是这个。我有一个执行某些操作并将信息写入错误输出流的 exe(不确定这是不是它的真实名称)。我想在写入时接收这些写入,解析它们以获得我想要的信息并将它们存储在一个对象中供以后使用。

我是一个非常新手的编码员,对异步编码非常陌生。我以同步方式测试了 RunProcessAsync 的功能;它被调用的地方,只是将所有原始数据写入输出 window 而没有将任何数据返回给调用方法。那工作得很好。另外,我得到了一个使用 IAsyncEnumerable 的测试异步流,但它只使用了 Task.Delay 并返回了一些整数。现在我正在尝试将这些东西结合起来,但我缺乏经验阻碍了我。

感谢大家提供的任何帮助以及帮助提高 C# 的技能和知识。

没有 minimal, reproducible example,就不可能完全解决您的问题。但是我们可以处理您提出的两个具体问题。

首先,如果您的对象(例如Process)不支持IAsyncDisposable,那么就不要使用它。请改用同步 using 语句。

就方法中的 yield return 而言,如果您花点时间,您可能会发现您尝试编写的内容没有任何意义。事件处理程序(一种完全不同的方法)如何能够使当前方法产生新值?您需要事件处理程序在事件发生时向当前方法发出信号。您可以通过多种方式执行此操作,但 SemaphoreSlim 是更直接的方法之一。

把它们放在一起,你可能会得到这样的东西:

private static async IAsyncEnumerable<string> RunProcessAsync(string args)
{
  using (var myProcess = new Process())
  {
    myProcess.StartInfo.FileName = @"path\to\file.exe";
    myProcess.StartInfo.Arguments = args;
    myProcess.StartInfo.UseShellExecute = false;
    myProcess.StartInfo.RedirectStandardError = true;
    myProcess.StartInfo.CreateNoWindow = true;

    ConcurrentQueue<string> dataQueue = new ConcurrentQueue<string>();
    SemaphoreSlim dataSemaphore = new SemaphoreSlim(0);

    myProcess.ErrorDataReceived += (s, e) =>
    {
      dataQueue.Enqueue(e.Data);
      dataSemaphore.Release();
    }

    myProcess.Start();
    myProcess.BeginErrorReadLine();

    while (true)
    {
      await dataSemaphore.WaitAsync();

      // Only one consumer, so this will always succeed
      dataQueue.TryDequeue(out string data);

      if (data == null) break;

      yield return data;
    }
  }
}

由于您没有提供实际的 MCVE,因此我无法尝试从头开始重建您的场景。所以上面没有编译,更不用说测试了。但它应该能说明要点。

也就是说,您需要使迭代器方法保持异步(这意味着您不能阻塞对 WaitForExit() 的调用),并且您需要以某种方式移动 [=18= 接收到的数据] 事件处理程序返回到迭代器方法。在上面,我将 thread-safe 队列对象与信号量结合使用。

每次接收到一行数据时,事件处理程序中的信号量计数都会增加(通过Release()),然后迭代器方法通过减少信号量计数(通过WaitAsync()) 并返回收到的行。

这里还有很多其他机制可以用于 producer/consumer 方面。来自 TPL Dataflow 的 a well-received Q&A here that discusses async-compatible mechanisms, including a custom version of BlockingCollection<T> that supports async operations, and a mention of the BufferBlock<T> class

这是一个使用 BufferBlock<T> 的示例(其语义与 BlockingCollection<T> 非常相似,但包括消费代码的异步处理):

static async IAsyncEnumerable<string> RunProcessAsync(string args)
{
    using (var process = new Process())
    {
        myProcess.StartInfo.FileName = @"path\to\file.exe";
        process.StartInfo.Arguments = args;
        process.StartInfo.UseShellExecute = false;
        process.StartInfo.RedirectStandardError = true;
        process.StartInfo.CreateNoWindow = true;

        BufferBlock<string> dataBuffer = new BufferBlock<string>();

        process.ErrorDataReceived += (s, e) =>
        {
            if (e.Data != null)
            {
                dataBuffer.Post(e.Data);
            }
            else
            {
                dataBuffer.Complete();
            }
        };

        process.Start();
        process.BeginErrorReadLine();

        while (await dataBuffer.OutputAvailableAsync())
        {
            yield return dataBuffer.Receive();
        }
    }
}

是这样的吗?

using CliWrap;
using CliWrap.EventStream;

var cmd = Cli.Wrap("foo").WithArguments("bar");

await foreach (var cmdEvent in cmd.ListenAsync())
{
    switch (cmdEvent)
    {
        case StartedCommandEvent started:
            _output.WriteLine($"Process started; ID: {started.ProcessId}");
            break;
        case StandardOutputCommandEvent stdOut:
            _output.WriteLine($"Out> {stdOut.Text}");
            break;
        case StandardErrorCommandEvent stdErr:
            _output.WriteLine($"Err> {stdErr.Text}");
            break;
        case ExitedCommandEvent exited:
            _output.WriteLine($"Process exited; Code: {exited.ExitCode}");
            break;
    }
}

这在 CliWrap 中可用。