不能 运行 TPL 数据流管道

Can not run TPL Dataflow pipeline

我正在尝试使用 TPL Dataflow 创建一个管道,我可以在其中将消息存储在批处理块中,并且每当达到阈值时,它就会将数据发送到一个操作 block.I 添加了一个缓冲块以防动作块太慢。
到目前为止,我已经尝试了所有可能的方法将数据从第一个块移动到第二个块,但都无济于事。我已经 link 编辑了块,添加了 PropagateCompletionDataFlowLinkOptions 设置为 true。为了使此管道正常工作,我还需要做什么?

管道

class LogPipeline<T>
{
    private ActionBlock<T[]> actionBlock;
    private BufferBlock<T> bufferBlock;
    private BatchBlock<T> batchBlock;
    private readonly Action<T[]> action;
    private readonly int BufferSize;
    private readonly int BatchSize;

    public LogPipeline(Action<T[]> action, int bufferSize = 4, int batchSize = 2)
    {
        this.BufferSize = bufferSize;
        this.BatchSize = batchSize;
        this.action = action;
    }
    private void Initialize()
    {
        this.bufferBlock = new BufferBlock<T>(new DataflowBlockOptions
            { TaskScheduler = TaskScheduler.Default,
            BoundedCapacity = this.BufferSize });
        this.actionBlock = new ActionBlock<T[]>(this.action);
        this.batchBlock = new BatchBlock<T>(BatchSize);
        this.bufferBlock.LinkTo(this.batchBlock, new DataflowLinkOptions
            { PropagateCompletion = true });
        this.batchBlock.LinkTo(this.actionBlock, new DataflowLinkOptions
            { PropagateCompletion = true });
    }
    public void Post(T log)
    {
        this.bufferBlock.Post(log);
    }
    public void Start()
    {
        this.Initialize();
    }
    public void Stop()
    {
        actionBlock.Complete();
    }
}

测试

[TestCase(100, 1000, 5)]
public void CanBatchPipelineResults(int batchSize, int bufferSize, int cycles)
{

    List<int> data = new List<int>();
    LogPipeline<int> logPipeline = new LogPipeline<int>(
       batchSize: batchSize,
       bufferSize: bufferSize,
       action: (logs) =>
       {
           data.AddRange(logs);
       });
    logPipeline.Start();

    int SelectWithEffect(int element)
    {
        logPipeline.Post(element);
        return 3;
    }
    int count = 0;
    while (true)
    {
        if (count++ > cycles)
        {
            break;
        }
        var sent = Parallel.For(0, bufferSize, (x) => SelectWithEffect(x));
    }
    logPipeline.Stop();
    Assert.IsTrue(data.Count == cycles * batchSize);
}

为什么除了缓冲区之外我的所有块都是空的?我试过 SendAsync 也无济于事。无论我做什么,都没有数据从第一个块移动到下一个块。

我有和没有 link 选项。

更新 : 我已经完全删除了管道和 Parallel。 我尝试了各种输入块 (batch/buffer/transform),但似乎后续块无法得到任何东西。
我也尝试过 await SendAsync 以及 Post.
我只在 unit tests 类.
内尝试过 这可能是问题所在吗?

更新 2 我把事情复杂化是错误的,我尝试了一个更简单的例子。在测试用例中,即使这样也行不通:

List<int> items=new List<int>(); var tf=new TransformBlock<int,int>(x=>x+1); var action= new ActionBlock<int>(x=>items.Add(x)); tf.LinkTo(action, new DataFlowOptions{ PropagateCompletion=true}); tf.Post(3); //Breakpoint here

通过在将数据发送到 BufferBlock 后立即调用 logPipeline.Stop,您正在完成 ActionBlock,因此它将拒绝 BatchBlock 正在尝试的所有消息稍后发送给它。来自 ActionBlock.Complete 方法的文档:

Signals to the dataflow block that it shouldn't accept or produce any more messages and shouldn't consume any more postponed messages.


更新:关于问题中更新的要求:

Whenever its threshold is hit it would send the data to an action block.

...我的建议是将此逻辑移动到 LogPipeline.Post 方法中。方法 BufferBlock.Post returns false 如果块没有接受发送给它的数据。

public void Post(T log)
{
    if (!this.bufferBlock.Post(log)) this.actionBlock.Post(log);
}

在测试结束之前似乎没有发生任何事情的原因是块的 none 有机会 运行。代码 通过使用 Parallel.For 阻塞 所有 CPU,因此没有其他任务有机会 运行。这意味着所有发布的消息仍在第一个块中。然后代码在最后一个块上调用 Complete,但甚至没有等待它在检查结果之前完成处理。

代码可以简化很多。对于初学者来说,所有块都有输入缓冲区,它们不需要额外的缓冲。

管道可以用这个代替:

//Arrange
var list=new List<int>();

var head=new BatchBlock<int>(BatchSize);
var act=new ActionBlock<int[]>(nums=>list.AddRange(nums);

var options= new DataflowLinkOptions{ PropagateCompletion = true };
head.LinkTo(act);

//ACT

//Just fire everything at once, because why not
var tasks=Enumerable.Range(0,cycles)(
    i=>Task.Run(()=> head.Post(i)));
await tasks;

//Tell the head block we're done
head.Complete();
//Wait for the last block to complete
await act.Completion;

//ASSERT
Assert.Equal(cycles, data.Count);

没有必要创建复杂的 class 来封装管道。它不会 "start" - 如果块没有数据,它们什么也不做。要抽象它,只需要提供对头块和最后一个块的Completion任务

的访问