我怎样才能拥有一个带有 2 个数据源的异步流 return
How can I have an async stream return with 2 data sources
我有以下功能,returns 标准输出数据,作为异步流,来自 运行 a System.Diagnostics.Process
。当前方法中的所有内容都按预期工作;我可以在 await foreach()
循环中调用它,我得到的每一行输出都是由外部 exe 生成的。
private static async IAsyncEnumerable<string> ProcessAsyncStream (
ProcessStartInfo processStartInfo)
{
// Ensure that process is destroyed when this method exits
using var process = new Process() { StartInfo = processStartInfo };
// Buffer used to pass data from event-handler back to this method
BufferBlock<string> dataBuffer = new BufferBlock<string>();
process.OutputDataReceived += (s, e) =>
{
if (e.Data is null)
{
dataBuffer.Complete();
}
else
{
dataBuffer.Post(e.Data);
}
};
// Start process and redirect output streams
process.Start();
process.BeginOutputReadLine();
// Return data line by line
while (await dataBuffer.OutputAvailableAsync())
yield return dataBuffer.Receive();
}
我的问题是现在我需要它 return 标准输出和标准错误结果。我做了这个 class 来保存来自每个流的数据。
public class ProcessData
{
public string Error { get; set; } = "";
public string Output { get; set; } = "";
}
并将 ProcessAsyncStream()
更改为如下所示
private static async IAsyncEnumerable<ProcessData> ProcessAsyncStream (
ProcessStartInfo processStartInfo)
{
// Ensure that process is destroyed when this method exits
using var process = new Process() { StartInfo = processStartInfo };
// Buffer used to pass data from event-handlers back to this method
BufferBlock<string> outputDataBuffer = new BufferBlock<string>();
BufferBlock<string> errorDataBuffer = new BufferBlock<string>();
process.OutputDataReceived += (s, e) =>
{
if (e.Data is null)
{
outputDataBuffer.Complete();
}
else
{
outputDataBuffer.Post(e.Data);
}
};
process.ErrorDataReceived += (s, e) =>
{
if (e.Data is null)
{
errorDataBuffer.Complete();
}
else
{
errorDataBuffer.Post(e.Data);
}
};
// Start process and redirect output streams
process.Start();
process.BeginOutputReadLine();
// Return data line by line
while (await outputDataBuffer.OutputAvailableAsync()
|| await errorDataBuffer.OutputAvailableAsync())
yield return new ProcessData()
{
Error = errorDataBuffer.Receive(),
Output = outputDataBuffer.Receive()
}
}
问题在于,如果其中一个缓冲区先于另一个缓冲区完成,则方法会挂起,因为该缓冲区的 .Receive()
没有任何数据可接收。如果我将 while
条件更改为 &&
那么我将不会从另一个缓冲区获取所有数据。
有什么建议吗?
您可以使用 ProcessData
项的单个缓冲区:
var buffer = new BufferBlock<ProcessData>();
然后使用自定义 Complete
机制在两个事件都传播了 null
值时完成缓冲区:
process.OutputDataReceived += (s, e) =>
{
if (e.Data is null) Complete(1);
else buffer.Post(new ProcessData() { Output = e.Data });
};
process.ErrorDataReceived += (s, e) =>
{
if (e.Data is null) Complete(2);
else buffer.Post(new ProcessData() { Error = e.Data });
};
下面是 Complete
方法的实现:
bool[] completeState = new bool[2];
void Complete(int index)
{
bool completed;
lock (completeState.SyncRoot)
{
completeState[index - 1] = true;
completed = completeState.All(v => v);
}
if (completed) buffer.Complete();
}
针对实际问题,读取区块的流程有问题。最简单的解决方案是只使用一个 buffer 与多个 producer 和一个 consumer 结合消息包.
您试图用 DataFlow 块 解决的概念问题是 events 和 异步流。 事件被推送,异步流被拉取。
有几种解决方案可以将它们映射在一起,但我认为最优雅的方法是使用 Unbounded Channel 作为 buffer.
Channels 比 DataFlow 更现代,自由度更小,比 BufferBlock
更简洁,而且非常轻巧且高度优化。此外,我会为不同的响应类型传递一个 wrapper。
忽略任何其他问题(概念性或其他方面)。
给定
public enum MessageType
{
Output,
Error
}
public class Message
{
public MessageType MessageType { get; set; }
public string Data { get; set; }
public Message(string data, MessageType messageType )
{
Data = data;
MessageType = messageType;
}
}
用法
private async IAsyncEnumerable<Message> ProcessAsyncStreamAsync(
ProcessStartInfo processStartInfo,
CancellationToken cancellationToken)
{
using var process = new Process() { StartInfo = processStartInfo };
var ch = Channel.CreateUnbounded<Message>();
var completeCount = 0;
void OnReceived(string data, MessageType type)
{
// The Interlocked memory barrier is likely overkill here
if (data is null && Interlocked.Increment(ref completeCount) == 2)
ch?.Writer.Complete();
else
ch?.Writer.WriteAsync(new Message(data, type), cancellationToken);
}
process.OutputDataReceived += (_, args) => OnReceived(args.Data, MessageType.Output);
process.ErrorDataReceived += (_, args) => OnReceived(args.Data, MessageType.Error);
// start the process
// ...
await foreach (var message in ch.Reader
.ReadAllAsync(cancellationToken)
.ConfigureAwait(false))
yield return message;
// cleanup
// ...
}
注意:完全未经测试
而是在退出时完成。
void HandleData(object sender, DataReceivedEventArgs e)
{
if (e.Data != null) dataBuffer.Post(e.Data);
}
process.OutputDataReceived += HandleData;
process.ErrorDataReceived += HandleData;
process.Exited += (s,e) =>
{
process.WaitForExit();
dataBuffer.Complete();
};
我有以下功能,returns 标准输出数据,作为异步流,来自 运行 a System.Diagnostics.Process
。当前方法中的所有内容都按预期工作;我可以在 await foreach()
循环中调用它,我得到的每一行输出都是由外部 exe 生成的。
private static async IAsyncEnumerable<string> ProcessAsyncStream (
ProcessStartInfo processStartInfo)
{
// Ensure that process is destroyed when this method exits
using var process = new Process() { StartInfo = processStartInfo };
// Buffer used to pass data from event-handler back to this method
BufferBlock<string> dataBuffer = new BufferBlock<string>();
process.OutputDataReceived += (s, e) =>
{
if (e.Data is null)
{
dataBuffer.Complete();
}
else
{
dataBuffer.Post(e.Data);
}
};
// Start process and redirect output streams
process.Start();
process.BeginOutputReadLine();
// Return data line by line
while (await dataBuffer.OutputAvailableAsync())
yield return dataBuffer.Receive();
}
我的问题是现在我需要它 return 标准输出和标准错误结果。我做了这个 class 来保存来自每个流的数据。
public class ProcessData
{
public string Error { get; set; } = "";
public string Output { get; set; } = "";
}
并将 ProcessAsyncStream()
更改为如下所示
private static async IAsyncEnumerable<ProcessData> ProcessAsyncStream (
ProcessStartInfo processStartInfo)
{
// Ensure that process is destroyed when this method exits
using var process = new Process() { StartInfo = processStartInfo };
// Buffer used to pass data from event-handlers back to this method
BufferBlock<string> outputDataBuffer = new BufferBlock<string>();
BufferBlock<string> errorDataBuffer = new BufferBlock<string>();
process.OutputDataReceived += (s, e) =>
{
if (e.Data is null)
{
outputDataBuffer.Complete();
}
else
{
outputDataBuffer.Post(e.Data);
}
};
process.ErrorDataReceived += (s, e) =>
{
if (e.Data is null)
{
errorDataBuffer.Complete();
}
else
{
errorDataBuffer.Post(e.Data);
}
};
// Start process and redirect output streams
process.Start();
process.BeginOutputReadLine();
// Return data line by line
while (await outputDataBuffer.OutputAvailableAsync()
|| await errorDataBuffer.OutputAvailableAsync())
yield return new ProcessData()
{
Error = errorDataBuffer.Receive(),
Output = outputDataBuffer.Receive()
}
}
问题在于,如果其中一个缓冲区先于另一个缓冲区完成,则方法会挂起,因为该缓冲区的 .Receive()
没有任何数据可接收。如果我将 while
条件更改为 &&
那么我将不会从另一个缓冲区获取所有数据。
有什么建议吗?
您可以使用 ProcessData
项的单个缓冲区:
var buffer = new BufferBlock<ProcessData>();
然后使用自定义 Complete
机制在两个事件都传播了 null
值时完成缓冲区:
process.OutputDataReceived += (s, e) =>
{
if (e.Data is null) Complete(1);
else buffer.Post(new ProcessData() { Output = e.Data });
};
process.ErrorDataReceived += (s, e) =>
{
if (e.Data is null) Complete(2);
else buffer.Post(new ProcessData() { Error = e.Data });
};
下面是 Complete
方法的实现:
bool[] completeState = new bool[2];
void Complete(int index)
{
bool completed;
lock (completeState.SyncRoot)
{
completeState[index - 1] = true;
completed = completeState.All(v => v);
}
if (completed) buffer.Complete();
}
针对实际问题,读取区块的流程有问题。最简单的解决方案是只使用一个 buffer 与多个 producer 和一个 consumer 结合消息包.
您试图用 DataFlow 块 解决的概念问题是 events 和 异步流。 事件被推送,异步流被拉取。
有几种解决方案可以将它们映射在一起,但我认为最优雅的方法是使用 Unbounded Channel 作为 buffer.
Channels 比 DataFlow 更现代,自由度更小,比 BufferBlock
更简洁,而且非常轻巧且高度优化。此外,我会为不同的响应类型传递一个 wrapper。
忽略任何其他问题(概念性或其他方面)。
给定
public enum MessageType
{
Output,
Error
}
public class Message
{
public MessageType MessageType { get; set; }
public string Data { get; set; }
public Message(string data, MessageType messageType )
{
Data = data;
MessageType = messageType;
}
}
用法
private async IAsyncEnumerable<Message> ProcessAsyncStreamAsync(
ProcessStartInfo processStartInfo,
CancellationToken cancellationToken)
{
using var process = new Process() { StartInfo = processStartInfo };
var ch = Channel.CreateUnbounded<Message>();
var completeCount = 0;
void OnReceived(string data, MessageType type)
{
// The Interlocked memory barrier is likely overkill here
if (data is null && Interlocked.Increment(ref completeCount) == 2)
ch?.Writer.Complete();
else
ch?.Writer.WriteAsync(new Message(data, type), cancellationToken);
}
process.OutputDataReceived += (_, args) => OnReceived(args.Data, MessageType.Output);
process.ErrorDataReceived += (_, args) => OnReceived(args.Data, MessageType.Error);
// start the process
// ...
await foreach (var message in ch.Reader
.ReadAllAsync(cancellationToken)
.ConfigureAwait(false))
yield return message;
// cleanup
// ...
}
注意:完全未经测试
而是在退出时完成。
void HandleData(object sender, DataReceivedEventArgs e)
{
if (e.Data != null) dataBuffer.Post(e.Data);
}
process.OutputDataReceived += HandleData;
process.ErrorDataReceived += HandleData;
process.Exited += (s,e) =>
{
process.WaitForExit();
dataBuffer.Complete();
};