DataflowEx 永远不会完成
DataflowEx never completes
我正在尝试将开源库 DataflowEx 与下一个数据流声明一起使用。
class RequestClientFlow :Dataflow<string>{
private readonly ILogger _logger;
private readonly Dataflow<string, WebProxy> _webproxyDataflow;
private readonly Dataflow<WebProxy, HttpClient> _httpClientDataflow;
public RequestClientFlow(ILogger logger) : this(DataflowOptions.Default){
_logger = logger;
}
public Dataflow<WebProxy, HttpClient> HttpClientDataflow => _httpClientDataflow;
public RequestClientFlow(DataflowOptions dataflowOptions) : base(dataflowOptions){
_webproxyDataflow = new TransformBlock<string,WebProxy>(s => {
_logger.WriteLine("aaaa");
return new WebProxy();
}).ToDataflow();
_httpClientDataflow = new TransformBlock<WebProxy,HttpClient>(proxy => {
_logger.WriteLine("bbbb");
return new HttpClient();
}).ToDataflow();
_webproxyDataflow.LinkTo(_httpClientDataflow);
RegisterChild(_webproxyDataflow);
RegisterChild(_httpClientDataflow);
}
public override ITargetBlock<string> InputBlock => _webproxyDataflow.InputBlock;
}
当我像
一样消费它时
var requestClientFlow = new RequestClientFlow(this);
requestClientFlow.Post("");
requestClientFlow.Complete();
await requestClientFlow.InputBlock.Completion;
它完成并且我的输出显示
18:32:54.3773|aaaa 18:32:54.3773|bbbb
1 passed, 0 failed, 0 skipped, took 1.45 seconds (xUnit.net 2.3.1
build 3858).
但是我的理解是来自框架文档,我也应该能够使用
requestClientFlow.Complete();
await requestClientFlow.CompletionTask;
甚至
await requestClientFlow.SignalAndWaitForCompletionAsync();
它没有完成。有人可以帮我理解我做错了什么吗?
您的流程无法完成,因为最后一个块是 TransformBlock
。在您的第一个示例中,您 await
完成了 Input 块,它实际上完成了。 Output 块无法完成,因为其输出缓冲区中的项目无处可去。 DataflowEx
库在流程的最后一个块上正确 awaiting
。可以在最后加一个ActionBlock
或NullTarget
来实现补全。
就DataflowEx
而言,最终流程应该是直接的
public interface IDataflow<in TIn> : IDataflow
{
ITargetBlock<TIn> InputBlock { get; }
}
如图书馆 github 页面上的示例所示:
public class AggregatorFlow : Dataflow<string>
{
//...//
public AggregatorFlow() : base(DataflowOptions.Default)
{
_splitter = new TransformBlock<string, KeyValuePair<string, int>>(s => this.Split(s));
_dict = new Dictionary<string, int>();
//***Note The ActionBlock here***
_aggregater = new ActionBlock<KeyValuePair<string, int>>(p => this.Aggregate(p));
//Block linking
_splitter.LinkTo(_aggregater, new DataflowLinkOptions() { PropagateCompletion = true });
/* IMPORTANT */
RegisterChild(_splitter);
RegisterChild(_aggregater);
}
//...//
}
我正在尝试将开源库 DataflowEx 与下一个数据流声明一起使用。
class RequestClientFlow :Dataflow<string>{
private readonly ILogger _logger;
private readonly Dataflow<string, WebProxy> _webproxyDataflow;
private readonly Dataflow<WebProxy, HttpClient> _httpClientDataflow;
public RequestClientFlow(ILogger logger) : this(DataflowOptions.Default){
_logger = logger;
}
public Dataflow<WebProxy, HttpClient> HttpClientDataflow => _httpClientDataflow;
public RequestClientFlow(DataflowOptions dataflowOptions) : base(dataflowOptions){
_webproxyDataflow = new TransformBlock<string,WebProxy>(s => {
_logger.WriteLine("aaaa");
return new WebProxy();
}).ToDataflow();
_httpClientDataflow = new TransformBlock<WebProxy,HttpClient>(proxy => {
_logger.WriteLine("bbbb");
return new HttpClient();
}).ToDataflow();
_webproxyDataflow.LinkTo(_httpClientDataflow);
RegisterChild(_webproxyDataflow);
RegisterChild(_httpClientDataflow);
}
public override ITargetBlock<string> InputBlock => _webproxyDataflow.InputBlock;
}
当我像
一样消费它时var requestClientFlow = new RequestClientFlow(this);
requestClientFlow.Post("");
requestClientFlow.Complete();
await requestClientFlow.InputBlock.Completion;
它完成并且我的输出显示
18:32:54.3773|aaaa 18:32:54.3773|bbbb
1 passed, 0 failed, 0 skipped, took 1.45 seconds (xUnit.net 2.3.1 build 3858).
但是我的理解是来自框架文档,我也应该能够使用
requestClientFlow.Complete();
await requestClientFlow.CompletionTask;
甚至
await requestClientFlow.SignalAndWaitForCompletionAsync();
它没有完成。有人可以帮我理解我做错了什么吗?
您的流程无法完成,因为最后一个块是 TransformBlock
。在您的第一个示例中,您 await
完成了 Input 块,它实际上完成了。 Output 块无法完成,因为其输出缓冲区中的项目无处可去。 DataflowEx
库在流程的最后一个块上正确 awaiting
。可以在最后加一个ActionBlock
或NullTarget
来实现补全。
就DataflowEx
而言,最终流程应该是直接的
public interface IDataflow<in TIn> : IDataflow
{
ITargetBlock<TIn> InputBlock { get; }
}
如图书馆 github 页面上的示例所示:
public class AggregatorFlow : Dataflow<string>
{
//...//
public AggregatorFlow() : base(DataflowOptions.Default)
{
_splitter = new TransformBlock<string, KeyValuePair<string, int>>(s => this.Split(s));
_dict = new Dictionary<string, int>();
//***Note The ActionBlock here***
_aggregater = new ActionBlock<KeyValuePair<string, int>>(p => this.Aggregate(p));
//Block linking
_splitter.LinkTo(_aggregater, new DataflowLinkOptions() { PropagateCompletion = true });
/* IMPORTANT */
RegisterChild(_splitter);
RegisterChild(_aggregater);
}
//...//
}