标记完成后重新打开 TPL 数据流输入

Reopen TPL Dataflow input after marking it complete

我正在尝试制作一个处理管道服务,用户可以将项目放入其中并等待结果完成处理。我的想法是使用 DI 使其能够注入。

我面临的问题是,在处理完第一组数据并将输入块标记为完成后,当我尝试处理另一组数据时它仍然处于关闭状态。有没有办法重新打开管道以允许再次处理数据?

我还在 TPL Dataflow 上使用名为 DataflowEx 的库。

   public interface IPipelineService
   {
        Task FillPipeline(object inputObj);

        Task WaitForResults();

        Task<List<object>> GetResults();

        Task FlushPipeline();

        Task Complete();
   }

   public class Pipeline : Dataflow<object>, IPipelineService
   {
        private TransformBlock<object, object> _inputBlock;
        private ActionBlock<object> _resultBlock;

        private List<object> _results { get; set; }

        public Pipeline() : base(DataflowOptions.Default)
        {
            _results = new List<object>();

            _inputBlock = new TransformBlock<object, object>(obj => Processing.Processing.ReceiveOrder(obj));
            _resultBlock = new ActionBlock<object>(obj => _results.Add(Processing.Processing.ReturnProcessedOrder(obj)));

            _inputBlock.LinkTo(_resultBlock, new DataflowLinkOptions() { PropagateCompletion = true });

            RegisterChild(_inputBlock);
            RegisterChild(_resultBlock);
        }

        public Task FillPipeline(object inputObj)
        {
            //InputBlock.Post(inputObj);
            return Task.CompletedTask;
        }

        public async Task WaitForResults()
        {
            await this.CompletionTask;
        }

        public Task<List<object>> GetResults()
        {
            return Task.FromResult(_results);
        }

        public Task FlushPipeline()
        {
            _results = new List<object>();
            return Task.CompletedTask;
        }

        Task IPipelineService.Complete()
        {
            InputBlock.Complete();
            return Task.CompletedTask;
        }

        public override ITargetBlock<object> InputBlock { get { return _inputBlock; } }

        public object Result { get { return _results; } }
    }

这是我目前正在使用的基本示例,用于测试这个想法。

这就是我希望能够使用它并能够在它完成第一组处理后将物品送入其中的方式。

await _pipelineService.FillPipeline(new GenerateOrder(OrderType.HomeLoan).order);
await _pipelineService.FillPipeline(new GenerateOrder(OrderType.OtherLoan).order);
await _pipelineService.FillPipeline(new GenerateOrder(OrderType.PersonalLoan).order);
await _pipelineService.FillPipeline(new GenerateOrder(OrderType.CarLoan).order);
await _pipelineService.Complete();
await _pipelineService.WaitForResults();

您无法重新启动已完成的数据流集 - 我只是重置我的对象以重新开始(在这种情况下,我在 CompleteAsync() 中调用 ResetDataFlow)

public class DownloadConnector
{
    public DownloadDataFlow DataFlow { get; set; }

    public DownloadConnector(int maxDop)
    {
        DataFlow = new DownloadDataFlow(maxDop);
    }

    public async Task SendAsync(DownloadItem item)
    {
        await DataFlow.BufferBlock.SendAsync(item);
    }

    public async Task CompleteAsync()
    {
        DataFlow.BufferBlock.Complete();
        await DataFlow.ActionBlock.Completion;
        DataFlow.ResetDataFlow();
    }
}

public class DownloadDataFlow
{
    public BufferBlock<DownloadItem> BufferBlock { get; set; }
    public TransformBlock<DownloadItem, DownloadItem> TransformBlock { get; set; }
    public ActionBlock<DownloadItem> ActionBlock { get; set; }
    public int MaxDop { get; set; }

    public DownloadDataFlow(int maxDop)
    {
        MaxDop = maxDop;
        ResetDataFlow();
    }

    public DownloadDataFlow ResetDataFlow()
    { 
        BufferBlock = new BufferBlock<DownloadItem>();
        TransformBlock = new TransformBlock<DownloadItem, DownloadItem>(DownloadAsync);
        ActionBlock = new ActionBlock<DownloadItem>(OnCompletion, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = MaxDop });
        BufferBlock.LinkTo(TransformBlock, new DataflowLinkOptions { PropagateCompletion = true });
        TransformBlock.LinkTo(ActionBlock, new DataflowLinkOptions { PropagateCompletion = true });

        return this;
    }

    public async Task DownloadAsync(DownloadItem item)
    {
        ...
    }

    public async Task OnCompletion(DownloadItem item)
    {
        ...
    }
}

public class DownloadItem
{
    ...
}

代码是 运行 使用:

var connector = new DownloadConnector(10);
await connector.SendAsync(new DownloadItem());
await connector.SendAsync(new DownloadItem());
await connector.SendAsync(new DownloadItem());
await connector.SendAsync(new DownloadItem());
await connector.CompleteAsync();

await connector.SendAsync(new DownloadItem());
await connector.SendAsync(new DownloadItem());
await connector.SendAsync(new DownloadItem());
await connector.SendAsync(new DownloadItem());
await connector.CompleteAsync();