TPL 数据流 - 非常快的生产者,不是那么快的消费者 OutOfMemory 异常

TPL Dataflow - very fast producer, not so fast consumers OutOfMemory exception

在将 TPL 数据流移植到我的生产代码中之前,我正在试验它。 生产代码是一个 classical producer/consumer 系统 - 生产者生产消息(与金融领域相关),消费者处理这些消息。

我感兴趣的是,如果在某个时候生产者的生产速度比消费者可以处理的速度快得多(系统会崩溃,还是会发生什么),更重要的是什么在这些情况下要做。

因此,为了尝试拥有类似的简单应用程序,我提出了以下建议。

var bufferBlock = new BufferBlock<Item>();

var executiondataflowBlockOptions = new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = Environment.ProcessorCount
                        ,
    BoundedCapacity = 100000
};

var dataFlowLinkOptions = new DataflowLinkOptions
{
    PropagateCompletion = true
};

var actionBlock1 = new ActionBlock<Item>(t => ProcessItem(t),
    executiondataflowBlockOptions);

bufferBlock.LinkTo(actionBlock1, dataFlowLinkOptions);
for (int i = 0; i < int.MaxValue; i++)
{
    bufferBlock.SendAsync(GenerateItem());
}

bufferBlock.Complete();
Console.ReadLine();

Item很简单class

internal class Item
{
    public Item(string itemId)
    {
        ItemId = itemId;
    }

    public string ItemId { get; }
}

GenerateItem 简单新闻 Item

static Item GenerateItem()
{
   return new Item(Guid.NewGuid().ToString());
}

现在,模仿 没那么快 消费者 - 我让 ProcessItem 持有 100ms

static async Task ProcessItem(Item item)
{
    await Task.Delay(TimeSpan.FromMilliseconds(100));
    Console.WriteLine($"Processing #{item.ItemId} item.");
}

执行此操作会在 20 秒左右出现 OOM 异常。

然后我继续添加更多的消费者(最多 10 个更多的 ActionBlock),这赢得了更多的时间,但最终导致了相同的 OOM 异常。

我也注意到GC压力很大(VS 2015诊断工具显示GC几乎一直是运行),所以我引入了对象池(很简单的一个,本质上就是ConcurrentBag 存储项目)Item,但我仍然遇到同样的问题(抛出 OOM 异常)。

详细说明内存中的内容,为什么 运行 不在内存中。

为了确保较慢的生产者能够让消费者跟上,我让生产者在迭代之间休眠:

for (int i = 0; i < int.MaxValue; i++)
{
    Thread.Sleep(TimeSpan.FromMilliseconds(50));
    bufferBlock.SendAsync(GenerateItem());
}

它工作正常 - 没有抛出异常,内存使用率一直很低,我再也看不到任何 GC 压力。

所以我很少问题

  1. 在尝试使用 TPL 数据流构建块
  2. 非常快速地重现 producer/slow 消费者场景时,我是否做错了什么?
  3. 有什么方法可以使这项工作正常进行而不因 OOM 异常而失败。
  4. 有关如何在 TPL 数据流上下文中处理此类场景(非常快 producer/slow 消费者)的最佳实践的任何 comments/links。
  5. 我对这个问题的理解是 - 由于消费者跟不上,BufferBlock 的内部缓冲区很快就会被消息填满,并且会延迟消息直到一些消费者回来请求下一条消息,因为应用程序内存不足(由于 BufferBlock 的内部缓冲区已满)- 你同意吗?

我正在使用 Microsoft.Tpl.Dataflow 包版本 4.5.24。 .NET 4.5(C#6)。进程是 32 位的。

您很好地识别了问题:BufferBlock 正在填充其输入缓冲区,直到它达到 OOM。

要解决这个问题,您还应该在缓冲区块中添加一个 BoundedCapacity 选项。这将自动为您限制生产者(不需要生产者中的 Thread.Sleep)。

以下代码可能存在严重问题:

for (int i = 0; i < int.MaxValue; i++)
{
    bufferBlock.SendAsync(GenerateItem()); // Don't do this!
}

SendAsync method returns a Task, which can be a much heavier object memory-wise than the actual item you are sending to the block. In the specific example the returned task is always completed, because the BufferBlock has unbounded capacity, and so the memory footprint of the task is practically zero (the same cached Task<bool> instance is returned all the time). But after configuring the block with a small BoundedCapacity 值,事情会很快变得有趣(以一种不愉快的方式)。每次调用 SendAsync 将很快开始返回一个不完整的 Task,每次都不一样,每个任务占用大约 200 字节的内存(如果还使用 CancellationToken 参数则为 300 字节)。这显然不会很好地扩展。

解决方案是按照预期的方式使用 SendAsync。这意味着应该等待它:

for (int i = 0; i < int.MaxValue; i++)
{
    await bufferBlock.SendAsync(GenerateItem()); // It's OK now
}

这样,生产者将被异步阻塞,直到块内有可用空间来容纳已发送的项目。希望这是你想要的。否则,如果不想阻塞生产者,就不要使用异步SendAsync方法,而是使用同步Post方法:

for (int i = 0; i < int.MaxValue; i++)
{
    var item = GenerateItem();
    while (true)
    {
        bool accepted = bufferBlock.Post(item); // Synchronous call
        if (accepted) break; // Break the inner loop
        if (bufferBlock.Completion.IsCompleted) return; // Break both loops

        // Here do other things for a while, before retrying to post the item
    }
}

或者您可以使用较低的杠杆 OfferMessage 方法(而不是 PostSendAsync):

for (int i = 0; i < int.MaxValue; i++)
{
    var item = GenerateItem();
    while (true)
    {
        var offerResult = ((ITargetBlock<Item>)bufferBlock).OfferMessage(
            new DataflowMessageHeader(1L), item, null, false);
        if (offerResult == DataflowMessageStatus.Accepted) break;
        if (offerResult == DataflowMessageStatus.DecliningPermanently) return;

        // Here do other things for a while, before retrying to offer the item
    }
}

幻数 1L 是 TPL 数据流 source code 内部声明的值,表示:

A well-known message ID for code that will send exactly one message, or where the exact message ID is not important.