TPL 数据流管道不断 运行 内部服务
TPL Dataflow pipe constantly running inside service
比如我有3块:
Buffer -> Transform -> Action
我是 运行 webapi 服务,它将请求中的数据带到缓冲区块。如何创建这样一个永远 运行 的管道,而无需在 Action 块中调用 Completion() 并停止整个管道。
如果您需要管道在应用程序的整个生命周期内保持不变,而不仅仅是请求,您可以使用静态 class 来保存它。不一定需要在操作块上调用完成。根据您的需要,另一种选择是将应用程序和处理管道分开。这些可以由数据库消息队列或只是单独的服务器端应用程序分开。
@svick 有一个很好的观点,即使用 TaskCompletionSource 来确定管道何时完成特定项目。将所有内容放在一起,这是一个可能有用的快速示例:
public class Controller {
public async Task<int> PostToPipeline(int inputValue) {
var message = new MessageIn(inputValue);
MyPipeline.InputBuffer.Post(message);
return await message.Completion.Task;
}
}
public class MessageIn {
public MessageIn(int value) {
InputValue = value;
Completion = new TaskCompletionSource<int>();
}
public int InputValue { get; set; }
public TaskCompletionSource<int> Completion { get; set; }
}
public class MessageProcessed {
public int ProcessedValue { get; set; }
public TaskCompletionSource<int> Completion { get; set; }
}
public static class MyPipeline {
public static BufferBlock<MessageIn> InputBuffer { get; private set; }
private static TransformBlock<MessageIn, MessageProcessed> transform;
private static ActionBlock<MessageProcessed> action;
static MyPipeline() {
BuildPipeline();
LinkPipeline();
}
static void BuildPipeline() {
InputBuffer = new BufferBlock<MessageIn>();
transform = new TransformBlock<MessageIn, MessageProcessed>((Func<MessageIn, MessageProcessed>)TransformMessage, new ExecutionDataflowBlockOptions() {
MaxDegreeOfParallelism = Environment.ProcessorCount,
BoundedCapacity = 10
});
action = new ActionBlock<MessageProcessed>((Action<MessageProcessed>)CompletedProcessing, new ExecutionDataflowBlockOptions() {
MaxDegreeOfParallelism = Environment.ProcessorCount,
BoundedCapacity = 10
});
}
static void LinkPipeline() {
InputBuffer.LinkTo(transform, new DataflowLinkOptions() { PropagateCompletion = true });
transform.LinkTo(action, new DataflowLinkOptions() { PropagateCompletion = true });
}
static MessageProcessed TransformMessage(MessageIn message) {
return new MessageProcessed() {
ProcessedValue = message.InputValue++,
Completion = message.Completion
};
}
static void CompletedProcessing(MessageProcessed message) {
message.Completion.SetResult(message.ProcessedValue);
}
}
有几种方法可以协调管道内特定作业的完成;等待完成源可能是满足您需求的最佳方法。
Dataflow 没有很好的解决方案来获取特定输入的管道输出(因为它支持的不仅仅是简单的管道)。
解决此问题的方法是创建一个 TaskCompletionSource<T>
并将其与输入一起发送到管道。管道中的每个块将其发送到下一个块,最后一个块调用其 SetResult()
.
将输入发送到管道的代码然后可以 await
TaskCompletionSource
的 Task
等待管道的输出。
比如我有3块:
Buffer -> Transform -> Action
我是 运行 webapi 服务,它将请求中的数据带到缓冲区块。如何创建这样一个永远 运行 的管道,而无需在 Action 块中调用 Completion() 并停止整个管道。
如果您需要管道在应用程序的整个生命周期内保持不变,而不仅仅是请求,您可以使用静态 class 来保存它。不一定需要在操作块上调用完成。根据您的需要,另一种选择是将应用程序和处理管道分开。这些可以由数据库消息队列或只是单独的服务器端应用程序分开。
@svick 有一个很好的观点,即使用 TaskCompletionSource 来确定管道何时完成特定项目。将所有内容放在一起,这是一个可能有用的快速示例:
public class Controller {
public async Task<int> PostToPipeline(int inputValue) {
var message = new MessageIn(inputValue);
MyPipeline.InputBuffer.Post(message);
return await message.Completion.Task;
}
}
public class MessageIn {
public MessageIn(int value) {
InputValue = value;
Completion = new TaskCompletionSource<int>();
}
public int InputValue { get; set; }
public TaskCompletionSource<int> Completion { get; set; }
}
public class MessageProcessed {
public int ProcessedValue { get; set; }
public TaskCompletionSource<int> Completion { get; set; }
}
public static class MyPipeline {
public static BufferBlock<MessageIn> InputBuffer { get; private set; }
private static TransformBlock<MessageIn, MessageProcessed> transform;
private static ActionBlock<MessageProcessed> action;
static MyPipeline() {
BuildPipeline();
LinkPipeline();
}
static void BuildPipeline() {
InputBuffer = new BufferBlock<MessageIn>();
transform = new TransformBlock<MessageIn, MessageProcessed>((Func<MessageIn, MessageProcessed>)TransformMessage, new ExecutionDataflowBlockOptions() {
MaxDegreeOfParallelism = Environment.ProcessorCount,
BoundedCapacity = 10
});
action = new ActionBlock<MessageProcessed>((Action<MessageProcessed>)CompletedProcessing, new ExecutionDataflowBlockOptions() {
MaxDegreeOfParallelism = Environment.ProcessorCount,
BoundedCapacity = 10
});
}
static void LinkPipeline() {
InputBuffer.LinkTo(transform, new DataflowLinkOptions() { PropagateCompletion = true });
transform.LinkTo(action, new DataflowLinkOptions() { PropagateCompletion = true });
}
static MessageProcessed TransformMessage(MessageIn message) {
return new MessageProcessed() {
ProcessedValue = message.InputValue++,
Completion = message.Completion
};
}
static void CompletedProcessing(MessageProcessed message) {
message.Completion.SetResult(message.ProcessedValue);
}
}
有几种方法可以协调管道内特定作业的完成;等待完成源可能是满足您需求的最佳方法。
Dataflow 没有很好的解决方案来获取特定输入的管道输出(因为它支持的不仅仅是简单的管道)。
解决此问题的方法是创建一个 TaskCompletionSource<T>
并将其与输入一起发送到管道。管道中的每个块将其发送到下一个块,最后一个块调用其 SetResult()
.
将输入发送到管道的代码然后可以 await
TaskCompletionSource
的 Task
等待管道的输出。