DataflowEx 永远不会完成

DataflowEx never completes

我正在尝试将开源库 DataflowEx 与下一个数据流声明一起使用。

class RequestClientFlow :Dataflow<string>{
    private readonly ILogger _logger;
    private readonly Dataflow<string, WebProxy> _webproxyDataflow;
    private readonly Dataflow<WebProxy, HttpClient> _httpClientDataflow;

    public RequestClientFlow(ILogger logger) : this(DataflowOptions.Default){
        _logger = logger;
    }

    public Dataflow<WebProxy, HttpClient> HttpClientDataflow => _httpClientDataflow;

    public RequestClientFlow(DataflowOptions dataflowOptions) : base(dataflowOptions){
        _webproxyDataflow = new TransformBlock<string,WebProxy>(s => {
            _logger.WriteLine("aaaa");
            return new WebProxy();
        }).ToDataflow();
        _httpClientDataflow = new TransformBlock<WebProxy,HttpClient>(proxy => {
            _logger.WriteLine("bbbb");
            return new HttpClient();
        }).ToDataflow();
        _webproxyDataflow.LinkTo(_httpClientDataflow);
        RegisterChild(_webproxyDataflow);
        RegisterChild(_httpClientDataflow);
    }

    public override ITargetBlock<string> InputBlock => _webproxyDataflow.InputBlock;
}

当我像

一样消费它时
var requestClientFlow = new RequestClientFlow(this);
requestClientFlow.Post("");
requestClientFlow.Complete();
await requestClientFlow.InputBlock.Completion;

它完成并且我的输出显示

18:32:54.3773|aaaa 18:32:54.3773|bbbb

1 passed, 0 failed, 0 skipped, took 1.45 seconds (xUnit.net 2.3.1 build 3858).

但是我的理解是来自框架文档,我也应该能够使用

    requestClientFlow.Complete();
    await requestClientFlow.CompletionTask;

甚至

await requestClientFlow.SignalAndWaitForCompletionAsync();

它没有完成。有人可以帮我理解我做错了什么吗?

您的流程无法完成,因为最后一个块是 TransformBlock。在您的第一个示例中,您 await 完成了 Input 块,它实际上完成了。 Output 块无法完成,因为其输出缓冲区中的项目无处可去。 DataflowEx 库在流程的最后一个块上正确 awaiting。可以在最后加一个ActionBlockNullTarget来实现补全。

DataflowEx而言,最终流程应该是直接的

public interface IDataflow<in TIn> : IDataflow
{
    ITargetBlock<TIn> InputBlock { get; }
}

如图书馆 github 页面上的示例所示:

public class AggregatorFlow : Dataflow<string>
{
    //...//

    public AggregatorFlow() : base(DataflowOptions.Default)
    {
        _splitter = new TransformBlock<string, KeyValuePair<string, int>>(s => this.Split(s));
        _dict = new Dictionary<string, int>();

        //***Note The ActionBlock here***
        _aggregater = new ActionBlock<KeyValuePair<string, int>>(p => this.Aggregate(p));

        //Block linking
        _splitter.LinkTo(_aggregater, new DataflowLinkOptions() { PropagateCompletion = true });

        /* IMPORTANT */
        RegisterChild(_splitter);
        RegisterChild(_aggregater);
    }

    //...//
}