TPL 数据流管道不断 运行 内部服务

TPL Dataflow pipe constantly running inside service

比如我有3块:

       Buffer -> Transform -> Action 

我是 运行 webapi 服务,它将请求中的数据带到缓冲区块。如何创建这样一个永远 运行 的管道,而无需在 Action 块中调用 Completion() 并停止整个管道。

如果您需要管道在应用程序的整个生命周期内保持不变,而不仅仅是请求,您可以使用静态 class 来保存它。不一定需要在操作块上调用完成。根据您的需要,另一种选择是将应用程序和处理管道分开。这些可以由数据库消息队列或只是单独的服务器端应用程序分开。

@svick 有一个很好的观点,即使用 TaskCompletionSource 来确定管道何时完成特定项目。将所有内容放在一起,这是一个可能有用的快速示例:

public class Controller {

    public async Task<int> PostToPipeline(int inputValue) {
        var message = new MessageIn(inputValue);
        MyPipeline.InputBuffer.Post(message);
        return await message.Completion.Task;
    }
}

public class MessageIn {
    public MessageIn(int value) {
        InputValue = value;
        Completion = new TaskCompletionSource<int>();
    }

    public int InputValue { get; set; }
    public TaskCompletionSource<int> Completion { get; set; }
}

public class MessageProcessed {
    public int ProcessedValue { get; set; }
    public TaskCompletionSource<int> Completion { get; set; }
}

public static class MyPipeline {

    public static BufferBlock<MessageIn> InputBuffer { get; private set; }
    private static TransformBlock<MessageIn, MessageProcessed> transform;
    private static ActionBlock<MessageProcessed> action;

    static MyPipeline() {
        BuildPipeline();
        LinkPipeline();

    }

    static void BuildPipeline() {
        InputBuffer = new BufferBlock<MessageIn>();

        transform = new TransformBlock<MessageIn, MessageProcessed>((Func<MessageIn, MessageProcessed>)TransformMessage, new ExecutionDataflowBlockOptions() {
            MaxDegreeOfParallelism = Environment.ProcessorCount,
            BoundedCapacity = 10
        });

        action = new ActionBlock<MessageProcessed>((Action<MessageProcessed>)CompletedProcessing, new ExecutionDataflowBlockOptions() {
            MaxDegreeOfParallelism = Environment.ProcessorCount,
            BoundedCapacity = 10
        });
    }

    static void LinkPipeline() {
        InputBuffer.LinkTo(transform, new DataflowLinkOptions() { PropagateCompletion = true });
        transform.LinkTo(action, new DataflowLinkOptions() { PropagateCompletion = true });
    }

    static MessageProcessed TransformMessage(MessageIn message) {
        return new MessageProcessed() {
            ProcessedValue = message.InputValue++,
            Completion = message.Completion
        };
    }

    static void CompletedProcessing(MessageProcessed message) {
        message.Completion.SetResult(message.ProcessedValue);
    }
} 

有几种方法可以协调管道内特定作业的完成;等待完成源可能是满足您需求的最佳方法。

Dataflow 没有很好的解决方案来获取特定输入的管道输出(因为它支持的不仅仅是简单的管道)。

解决此问题的方法是创建一个 TaskCompletionSource<T> 并将其与输入一起发送到管道。管道中的每个块将其发送到下一个块,最后一个块调用其 SetResult().

将输入发送到管道的代码然后可以 await TaskCompletionSourceTask 等待管道的输出。