通过 BufferBlock 的背压不起作用。 (C# TPL 数据流)

Backpressure via BufferBlock not working. (C# TPL Dataflow)

典型情况:生产者快,消费者慢,需要生产者慢下来
示例代码不符合我的预期(解释如下):

//  I assumed this block will behave like BlockingCollection, but it doesn't 
var bb = new BufferBlock<int>(new DataflowBlockOptions {
    BoundedCapacity = 3, // looks like this does nothing
});

// fast producer
int dataSource = -1;
var producer = Task.Run(() => {
    while (dataSource < 10) {
        var message = ++dataSource;
        bb.Post(message);
        Console.WriteLine($"Posted: {message}");
    }
    Console.WriteLine("Calling .Complete() on buffer block");
    bb.Complete();
});

// slow consumer
var ab = new ActionBlock<int>(i => {
    Thread.Sleep(500);
    Console.WriteLine($"Received: {i}");
}, new ExecutionDataflowBlockOptions {
    MaxDegreeOfParallelism = 2,
});

bb.LinkTo(ab);

ab.Completion.Wait();

我怎么认为这段代码会起作用,但它不起作用:

可以用 BlockingCollection 来完成,我认为在 TPL 数据流 (TDF) 世界中 BufferBlock 相当于。我想我误解了背压在 TPL 数据流中的工作方式。

那么问题在哪里呢?如何运行这个管道,在缓冲区bb中不允许超过3条消息,并等待它完成?

PS:我发现了这个要点 (https://gist.github.com/mnadel/df2ec09fe7eae9ba8938),其中建议维护一个信号量以阻止写入 BufferBlock。我以为这是 "built-in".

接受答案后更新:

接受答案后更新:

如果你正在看这个问题,你需要记住 ActionBlock 也有自己的输入缓冲区。

这是一个。然后你还需要意识到,因为所有块都有自己的输入缓冲区,所以你不需要 BufferBlock 就像你认为它的名字暗示的那样。 BufferBlock 更像是用于更复杂架构的实用程序块或平衡加载块。但它不是背压缓冲区。

需要在 link 级别明确定义完成传播。

调用.LinkTo()时需要显式传递new DataflowLinkOptions {PropagateCompletion = true}作为第二个参数。

要引入背压,您需要在将项目发送到区块时使用 SendAsync。这允许您的生产者等待块为项目准备好。像这样的东西就是你要找的东西:

class Program
{
    static async Task Main()
    {
        var options = new ExecutionDataflowBlockOptions()
        {
            BoundedCapacity = 3
        };
        var block = new ActionBlock<int>(async i =>
        {
            await Task.Delay(100);
            Console.WriteLine(i);
        }, options);

        //Producer
        foreach (var i in Enumerable.Range(0, 10))
        {
            await block.SendAsync(i);
        }

        block.Complete();
        await block.Completion;
    }
}

如果您将此更改为使用 Post 并打印 Post 的结果,您会看到许多项目无法传递到块:

class Program
{
    static async Task Main()
    {
        var options = new ExecutionDataflowBlockOptions()
        {
            BoundedCapacity = 1
        };
        var block = new ActionBlock<int>(async i =>
        {
            await Task.Delay(1000);
            Console.WriteLine(i);
        }, options);

        //Producer
        foreach (var i in Enumerable.Range(0, 10))
        {
            var result = block.Post(i);
            Console.WriteLine(result);
        }

        block.Complete();
        await block.Completion;
    }
}

输出:

True
False
False
False
False
False
False
False
False
False
0

在 JSteward 的回答的指导下,我想出了以下代码。 它在处理所述项目的同时产生(读取等)新项目,维护一个预读缓冲区。 当 "producer" 没有更多项目时,完成信号被发送到链的头部。 该程序还在终止之前等待整个链的完成。

static async Task Main() {

    string Time() => $"{DateTime.Now:hh:mm:ss.fff}";

    // the buffer is added to the chain just for demonstration purposes
    // the chain would work fine using just the built-in input buffer
    // of the `action` block.
    var buffer = new BufferBlock<int>(new DataflowBlockOptions {BoundedCapacity = 3});

    var action = new ActionBlock<int>(async i =>
    {
        Console.WriteLine($"[{Time()}]: Processing: {i}");
        await Task.Delay(500);
    }, new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 2, BoundedCapacity = 2});

    // it's necessary to set `PropagateCompletion` property
    buffer.LinkTo(action, new DataflowLinkOptions {PropagateCompletion = true});

    //Producer
    foreach (var i in Enumerable.Range(0, 10))
    {
        Console.WriteLine($"[{Time()}]: Ready to send: {i}");
        await buffer.SendAsync(i);
        Console.WriteLine($"[{Time()}]: Sent: {i}");
    }

    // we call `.Complete()` on the head of the chain and it's propagated forward
    buffer.Complete(); 

    await action.Completion;
}