当消费者不堪重负时,如何让快速生产者暂停?

How to make fast producer paused when consumer is overwhelmed?

我在我的应用程序中使用 TPL 数据流实现了生产者/消费者模式。我有一个大数据流网格,里面有大约 40 个块。网格中有两个主要的功能部分:生产者部分和消费者部分。生产者应该不断地为消费者提供大量工作,而消费者有时会缓慢地处理传入的工作。当消费者忙于某些指定数量的工作项目时,我想暂停生产者。否则该应用会消耗大量内存/CPU 并且表现不可持续。

我制作了演示该问题的演示应用程序:

using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataflowTest
{
    class Program
    {
        static void Main(string[] args)
        {
            var options = new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4,
                EnsureOrdered = false
            };

            var boundedOptions = new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4,
                EnsureOrdered = false,
                BoundedCapacity = 5
            };

            var bufferBlock = new BufferBlock<int>(boundedOptions);
            var producerBlock = new TransformBlock<int, int>(x => x + 1, options);
            var broadcastBlock = new BroadcastBlock<int>(x => x, options);

            var consumerBlock = new ActionBlock<int>(async x =>
            {
                var delay = 1000;
                if (x > 10) delay = 5000;

                await Task.Delay(delay);

                Console.WriteLine(x);
            }, boundedOptions);

            producerBlock.LinkTo(bufferBlock);
            bufferBlock.LinkTo(broadcastBlock);
            broadcastBlock.LinkTo(producerBlock);
            broadcastBlock.LinkTo(consumerBlock);

            bufferBlock.Post(1);

            consumerBlock.Completion.Wait();            
        }        
    }
}

应用打印如下内容:

2
1
3
4
5
69055
69053
69054
69057
438028
438040
142303
438079

这意味着生产者不断旋转并将消息推送给消费者。我希望它暂停并等到消费者完成当前部分工作,然后生产者应继续为消费者提供消息。

我的问题与其他 question 相似,但没有得到正确回答。我尝试了该解决方案,但它在这里不起作用,允许生产者向消费者发送大量消息。另外设置 BoundedCapacity 也不起作用。

到目前为止我猜想的唯一解决方案是制作我自己的块来监视目标块队列并根据目标块的队列进行操作。但我希望这对这个问题有点矫枉过正。

看来你的producer是生成sequence的,所以不需要producer→buffer→broadcast的整个循环。相反,所有三个块都可以替换为一个 async 循环,该循环生成下一个项目,然后使用 await SendAsync():

将其发送给消费者
Task.Run(async () =>
{
    int i = 1;
    while (true)
    {
        await consumerBlock.SendAsync(i);
        i++;
    }
    consumerBlock.Complete();
});

这样,一旦消费者达到其容量,await SendAsync() 将确保生产者等待消费者消费物品。

如果你想将这样的生产者封装到一个数据流块中,这样你就可以,例如link 它给消费者,

如果您需要保持生产者 → 缓冲区 → 广播周期完整,那么您需要将广播块替换为其他仍然广播它收到的消息但在其目标之一已满时等待的块。

只要您在创建该块时知道该块的目标,就可以使用 ActionBlock(从 another answer of mine 复制的代码)构建它:

public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>(
    DataflowBlockOptions options, params ITargetBlock<T>[] targets)
{
    var block = new ActionBlock<T>(
        async item =>
        {
            foreach (var target in targets)
            {
                await target.SendAsync(item);
            }
        }, new ExecutionDataflowBlockOptions
        {
            BoundedCapacity = options.BoundedCapacity,
            CancellationToken = options.CancellationToken
        });

    block.Completion.ContinueWith(task =>
    {
        foreach (var target in targets)
        {
            if (task.Exception != null)
                target.Fault(task.Exception);
            else
                target.Complete();
        }
    });

    return block;
}

使用这个,你可以声明广播块:

var broadcastBlock = CreateGuaranteedBroadcastBlock(
    boundedOptions, producerBlock, consumerBlock);

(您还需要从 broadcastBlock 中删除 link 的 LinkTo 行。)

你的原始代码的一个问题是完成,但它没有解决,但这在 TPL 数据流中通常是一个难题。