使用 DataflowEx 封装

Encapsulating with DataflowEx

我想用DataflowEx封装这个测试。我不确定我的设计是否正确,因为我自己做了很多尝试,但我未能像测试中那样完成数据流,并且不清楚我应该如何 post 服务器、IP 组合从外面看。

    class HttpClientHandlerFactory : Dataflow<string, HttpClientHandler> {
        private readonly TransformBlock<string, WebProxy> _webproxyDataflow;
        private readonly TransformBlock<WebProxy, HttpClientHandler> _httpClientHandlerDataflow;

        public HttpClientHandlerFactory(IHttpClientHandlerFactoryData handlerFactoryData) : this(DataflowOptions.Default, handlerFactoryData) {
        }

        public HttpClientHandlerFactory(DataflowOptions dataflowOptions, IHttpClientHandlerFactoryData handlerFactoryData) : base(dataflowOptions){
            var dataflowBlockOptions = dataflowOptions.ToExecutionBlockOption();
            _webproxyDataflow = new TransformBlock<string, WebProxy>(async s => {
                new WebProxy(s);
            }, dataflowBlockOptions);
            _httpClientHandlerDataflow = new TransformBlock<WebProxy, HttpClientHandler>(proxy => new HttpClientHandler(){Proxy = proxy}, dataflowBlockOptions);
            var dataflowLinkOptions = new DataflowLinkOptions() { PropagateCompletion = true };
            _webproxyDataflow.LinkTo(_httpClientHandlerDataflow, dataflowLinkOptions, proxy => proxy != null);
            var nullTarget = DataflowBlock.NullTarget<WebProxy>();
            _webproxyDataflow.LinkTo(nullTarget, dataflowLinkOptions, proxy => proxy == null);
            RegisterChild(_webproxyDataflow);
            RegisterChild(_httpClientHandlerDataflow);
        }

        public override ITargetBlock<string> InputBlock => _webproxyDataflow;

        public override ISourceBlock<HttpClientHandler> OutputBlock => _httpClientHandlerDataflow;
    }

    class FactoryData: IHttpClientHandlerFactoryData {


    }
    [Fact]
    public async void MethodName(){
        var httpClientHandlers = new BufferBlock<HttpClientHandler>();
        var httpClientHandlerFactory = new HttpClientHandlerFactory(new FactoryData());
        httpClientHandlerFactory.LinkTo(httpClientHandlers.ToDataflow());
        var baseAddresses = new BufferBlock<Uri>();
        var transformBlock = new TransformBlock<Tuple<HttpClientHandler, Uri>,HttpClient>(
                data => {
                    WriteLine($"{data.Item2.ToString()}-{Environment.CurrentManagedThreadId}-{((WebProxy) data.Item1.Proxy).Address}");
                    return new HttpClient(data.Item1){BaseAddress = data.Item2};
                });
        var joinBlock = new JoinBlock<HttpClientHandler,Uri>();
        httpClientHandlers.LinkTo(joinBlock.Target1);
        baseAddresses.LinkTo(joinBlock.Target2);
        joinBlock.LinkTo(transformBlock,new DataflowLinkOptions(){PropagateCompletion = true});

        baseAddresses.Post(new Uri("http://www.serverA.com"));
        baseAddresses.Post(new Uri("http://www.ServerB.com"));
        httpClientHandlerFactory.Post("127.0.0.1");
        httpClientHandlerFactory.Post("127.0.0.2");
        joinBlock.Complete();
        await transformBlock.Completion;

    }

更新

仔细阅读 JSteward 关于缺少 NullTarget 块的评论,我意识到我的原始块方法不正确。所以这是正确的版本。

    [Fact]
    public async Task MethodName() {
        var dataflowLinkOptions = new DataflowLinkOptions() { PropagateCompletion = true };
        var httpClientHandlerFactory = new HttpClientHandlerFactory(new HttpClientHandlerFactoryData());
        var baseAddresses = new BufferBlock<Uri>();
        var transformBlock = new TransformBlock<Tuple<HttpClientHandler, Uri>, HttpClient>(
                data => {
                    WriteLine($"{data.Item2.ToString()}-{Environment.CurrentManagedThreadId}-{((WebProxy)data.Item1.Proxy).Address}");
                    var httpClient = new HttpClient(data.Item1) { BaseAddress = data.Item2 };
                    return httpClient;
                });
        var joinBlock = new JoinBlock<HttpClientHandler, Uri>();
        httpClientHandlerFactory.OutputBlock.LinkTo(joinBlock.Target1,dataflowLinkOptions);
        baseAddresses.LinkTo(joinBlock.Target2,dataflowLinkOptions);
        joinBlock.LinkTo(transformBlock, dataflowLinkOptions);


        baseAddresses.Post(new Uri("http://www.serverA.com"));
        baseAddresses.Post(new Uri("http://www.ServerB.com"));
        httpClientHandlerFactory.Post("127.0.0.1");
        httpClientHandlerFactory.Post("127.0.0.2");

        httpClientHandlerFactory.Complete();
        baseAddresses.Complete();
        transformBlock.LinkTo(DataflowBlock.NullTarget<HttpClient>(), dataflowLinkOptions);

        await transformBlock.Completion;

    }

它以这个输出完成。

02:51:41.3365|http://www.servera.com/-11-http://127.0.0.1/ 02:51:41.3365|http://www.serverb.com/-11-http://127.0.0.2/

现在要迁移到 DataflowEx,我声明了下一个 class

    class HttpClientFactory:Dataflow<string,HttpClient>{
        private readonly HttpClientHandlerFactory _httpClientHandlerFactory;
        private readonly TransformBlock<Tuple<HttpClientHandler, Uri>, HttpClient> _transformBlock;
        private JoinBlock<HttpClientHandler, Uri> _joinBlock;
        private BufferBlock<Uri> _baseAddresses;

        public HttpClientFactory(IHttpClientFactoryData httpClientHandlerFactoryData) : this(DataflowOptions.Default, httpClientHandlerFactoryData) {
        }

        public HttpClientFactory(DataflowOptions dataflowOptions, IHttpClientFactoryData handlerFactoryData) : base(dataflowOptions) {
            var dataflowLinkOptions = new DataflowLinkOptions() { PropagateCompletion = true };
            _httpClientHandlerFactory = new HttpClientHandlerFactory(new HttpClientHandlerFactoryData());

            _baseAddresses = new BufferBlock<Uri>();
            _transformBlock = new TransformBlock<Tuple<HttpClientHandler, Uri>, HttpClient>(
                data => {
                    handlerFactoryData.Logger.WriteLine($"{data.Item2.ToString()}-{Environment.CurrentManagedThreadId}-{((WebProxy)data.Item1.Proxy).Address}");
                    return new HttpClient(data.Item1) { BaseAddress = data.Item2 };
                });
            _joinBlock = new JoinBlock<HttpClientHandler, Uri>();

            _httpClientHandlerFactory.OutputBlock.LinkTo(_joinBlock.Target1,dataflowLinkOptions);
            _baseAddresses.LinkTo(_joinBlock.Target2,dataflowLinkOptions);
            _joinBlock.LinkTo(_transformBlock, dataflowLinkOptions);
            RegisterChild(_transformBlock);
            RegisterChild(_httpClientHandlerFactory);
            RegisterChild(_baseAddresses);
            foreach (var baseAddress in handlerFactoryData.BaseAddresses){
                _baseAddresses.Post(baseAddress);
            }
        }

        public override ITargetBlock<string> InputBlock => _httpClientHandlerFactory.InputBlock;

        public override ISourceBlock<HttpClient> OutputBlock => _transformBlock;
    }

并编写了一个测试,它完成了预期的输出

    [Fact]
    public async Task MethodName2(){
        var clientFactoryData = new HttpClientFactoryData(new[]{new Uri("http://www.serverA.com")}){Logger = this};
        var httpClientFactory = new HttpClientFactory(DataflowOptions.Verbose,clientFactoryData);
        httpClientFactory.InputBlock.Post("127.0.0.1");
        httpClientFactory.LinkLeftToNull();
        httpClientFactory.Complete();

        await httpClientFactory.OutputBlock.Completion;
    }

02:55:44.8536|[HttpClientFactory1] has 0 todo items (in:0, out:0) at this moment. 02:55:45.0039|[HttpClientFactory1]->[HttpClientHandlerFactory1] completed 02:55:45.0039|http://www.servera.com/-11-http://127.0.0.1/

但是在上面的测试中,我使用了原始的 Complete 方法并在 OutputBlock 上等待,而不是在 DataFlow 上等待。使用内置数据流方法的下一个测试未完成。

    [Fact]
    public async Task MethodName2(){
        var clientFactoryData = new HttpClientFactoryData(new[]{new Uri("http://www.serverA.com")}){Logger = this};
        var httpClientFactory = new HttpClientFactory(DataflowOptions.Verbose,clientFactoryData);
        httpClientFactory.InputBlock.Post("127.0.0.1");
        httpClientFactory.LinkLeftToNull();


        await httpClientFactory.SignalAndWaitForCompletionAsync();
    }

覆盖 Complete 方法以完成额外的 BufferBlock 使测试通过。

public override void Complete(){
    base.Complete();
    _baseAddresses.Complete();
}