TPL 数据流块消耗所有可用内存
TPL Dataflow block consumes all available memory
我有一个 TransformManyBlock
设计如下:
- 输入:文件路径
- 输出:IEnumerable 文件内容,一次一行
我 运行 这个块在一个巨大的文件 (61GB) 上,它太大而无法放入 RAM。为了避免无限制的内存增长,我已将此块和所有下游块的 BoundedCapacity
设置为一个非常低的值(例如 1)。尽管如此,该块显然贪婪地迭代了 IEnumerable,这消耗了计算机上的所有可用内存,使每个进程都停止了。块的 OutputCount 继续无限上升,直到我终止进程。
我能做些什么来防止块以这种方式消耗 IEnumerable
?
编辑:这是一个说明问题的示例程序:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class Program
{
static IEnumerable<string> GetSequence(char c)
{
for (var i = 0; i < 1024 * 1024; ++i)
yield return new string(c, 1024 * 1024);
}
static void Main(string[] args)
{
var options = new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 };
var firstBlock = new TransformManyBlock<char, string>(c => GetSequence(c), options);
var secondBlock = new ActionBlock<string>(str =>
{
Console.WriteLine(str.Substring(0, 10));
Thread.Sleep(1000);
}, options);
firstBlock.LinkTo(secondBlock);
firstBlock.Completion.ContinueWith(task =>
{
if (task.IsFaulted) ((IDataflowBlock) secondBlock).Fault(task.Exception);
else secondBlock.Complete();
});
firstBlock.Post('A');
firstBlock.Complete();
for (; ; )
{
Console.WriteLine("OutputCount: {0}", firstBlock.OutputCount);
Thread.Sleep(3000);
}
}
}
如果您使用的是 64 位机器,请务必清除 Visual Studio 中的 "Prefer 32-bit" 选项。我的计算机上有 16GB 的 RAM,这个程序会立即消耗所有可用字节。
您似乎误解了 TPL 数据流的工作原理。
BoundedCapacity
限制了您可以 post 放入块中的项目数量。在您的情况下,这意味着单个 char
进入 TransformManyBlock
和单个 string
进入 ActionBlock
.
所以你 post 一个项目到 TransformManyBlock
然后 returns 1024*1024
字符串并尝试将它们传递给 ActionBlock
这将一次只接受一个。其余的字符串将位于 TransformManyBlock
的输出队列中。
您可能想要做的是在达到容量时通过等待(同步或其他方式)以流方式创建一个块和 post 个项目:
private static void Main()
{
MainAsync().Wait();
}
private static async Task MainAsync()
{
var block = new ActionBlock<string>(async item =>
{
Console.WriteLine(item.Substring(0, 10));
await Task.Delay(1000);
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
foreach (var item in GetSequence('A'))
{
await block.SendAsync(item);
}
block.Complete();
await block.Completion;
}
如果管道的输出比率低于 post 比率,消息将在管道上累积,直到内存耗尽或达到某个队列限制。
如果消息的大小很大,进程很快就会耗尽内存。
如果队列已经有一条消息,将BoundedCapacity
设置为1将导致消息被队列拒绝。例如,在批处理等情况下,这不是期望的行为。检查此 post 以获得见解。
这个工作测试说明了我的观点:
//Change BoundedCapacity to +1 to see it fail
[TestMethod]
public void Whosebug()
{
var total = 1000;
var processed = 0;
var block = new ActionBlock<int>(
(messageUnit) =>
{
Thread.Sleep(10);
Trace.WriteLine($"{messageUnit}");
processed++;
},
new ExecutionDataflowBlockOptions() { BoundedCapacity = -1 }
);
for (int i = 0; i < total; i++)
{
var result = block.SendAsync(i);
Assert.IsTrue(result.IsCompleted, $"failed for {i}");
}
block.Complete();
block.Completion.Wait();
Assert.AreEqual(total, processed);
}
所以我的方法是限制post,所以管道不会在队列中积累太多消息。
下面是一个简单的方法。
这样数据流会一直全速处理消息,但不会累积消息,这样可以避免过多的内存消耗。
//Should be adjusted for specific use.
public void postAssync(Message message)
{
while (totalPending = block1.InputCount + ... + blockn.InputCount> 100)
{
Thread.Sleep(200);
//Note: if allocating huge quantities for of memory for each message the Garbage collector may keep up with the pace.
//This is the perfect place to force garbage collector to release memory.
}
block1.SendAssync(message)
}
似乎要创建一个输出有界的TransformManyBlock
,需要三个内部块:
- A
TransformBlock
接收输入并产生 IEnumerable
s,运行 可能并行。
- 一个非并行
ActionBlock
,枚举生成的 IEnumerable
,并传播最终结果。
- A
BufferBlock
存储最终结果的地方,尊重理想的 BoundedCapacity
。
稍微棘手的部分是如何传播第二个块的完成,因为它没有直接链接到第三个块。下面的实现中,方法PropagateCompletion
是根据库的source code写的
public static IPropagatorBlock<TInput, TOutput>
CreateOutputBoundedTransformManyBlock<TInput, TOutput>(
Func<TInput, Task<IEnumerable<TOutput>>> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions)
{
if (transform == null) throw new ArgumentNullException(nameof(transform));
if (dataflowBlockOptions == null)
throw new ArgumentNullException(nameof(dataflowBlockOptions));
var input = new TransformBlock<TInput, IEnumerable<TOutput>>(transform,
dataflowBlockOptions);
var output = new BufferBlock<TOutput>(dataflowBlockOptions);
var middle = new ActionBlock<IEnumerable<TOutput>>(async results =>
{
if (results == null) return;
foreach (var result in results)
{
var accepted = await output.SendAsync(result).ConfigureAwait(false);
if (!accepted) break; // If one is rejected, the rest will be rejected too
}
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = dataflowBlockOptions.MaxDegreeOfParallelism,
CancellationToken = dataflowBlockOptions.CancellationToken,
SingleProducerConstrained = true,
});
input.LinkTo(middle, new DataflowLinkOptions() { PropagateCompletion = true });
PropagateCompletion(middle, output);
return DataflowBlock.Encapsulate(input, output);
async void PropagateCompletion(IDataflowBlock source, IDataflowBlock target)
{
try
{
await source.Completion.ConfigureAwait(false);
}
catch { }
var exception = source.Completion.IsFaulted ? source.Completion.Exception : null;
if (exception != null) target.Fault(exception); else target.Complete();
}
}
// Overload with synchronous delegate
public static IPropagatorBlock<TInput, TOutput>
CreateOutputBoundedTransformManyBlock<TInput, TOutput>(
Func<TInput, IEnumerable<TOutput>> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions)
{
return CreateOutputBoundedTransformManyBlock<TInput, TOutput>(
item => Task.FromResult(transform(item)), dataflowBlockOptions);
}
用法示例:
var firstBlock = CreateOutputBoundedTransformManyBlock<char, string>(
c => GetSequence(c), options);
我有一个 TransformManyBlock
设计如下:
- 输入:文件路径
- 输出:IEnumerable 文件内容,一次一行
我 运行 这个块在一个巨大的文件 (61GB) 上,它太大而无法放入 RAM。为了避免无限制的内存增长,我已将此块和所有下游块的 BoundedCapacity
设置为一个非常低的值(例如 1)。尽管如此,该块显然贪婪地迭代了 IEnumerable,这消耗了计算机上的所有可用内存,使每个进程都停止了。块的 OutputCount 继续无限上升,直到我终止进程。
我能做些什么来防止块以这种方式消耗 IEnumerable
?
编辑:这是一个说明问题的示例程序:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class Program
{
static IEnumerable<string> GetSequence(char c)
{
for (var i = 0; i < 1024 * 1024; ++i)
yield return new string(c, 1024 * 1024);
}
static void Main(string[] args)
{
var options = new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 };
var firstBlock = new TransformManyBlock<char, string>(c => GetSequence(c), options);
var secondBlock = new ActionBlock<string>(str =>
{
Console.WriteLine(str.Substring(0, 10));
Thread.Sleep(1000);
}, options);
firstBlock.LinkTo(secondBlock);
firstBlock.Completion.ContinueWith(task =>
{
if (task.IsFaulted) ((IDataflowBlock) secondBlock).Fault(task.Exception);
else secondBlock.Complete();
});
firstBlock.Post('A');
firstBlock.Complete();
for (; ; )
{
Console.WriteLine("OutputCount: {0}", firstBlock.OutputCount);
Thread.Sleep(3000);
}
}
}
如果您使用的是 64 位机器,请务必清除 Visual Studio 中的 "Prefer 32-bit" 选项。我的计算机上有 16GB 的 RAM,这个程序会立即消耗所有可用字节。
您似乎误解了 TPL 数据流的工作原理。
BoundedCapacity
限制了您可以 post 放入块中的项目数量。在您的情况下,这意味着单个 char
进入 TransformManyBlock
和单个 string
进入 ActionBlock
.
所以你 post 一个项目到 TransformManyBlock
然后 returns 1024*1024
字符串并尝试将它们传递给 ActionBlock
这将一次只接受一个。其余的字符串将位于 TransformManyBlock
的输出队列中。
您可能想要做的是在达到容量时通过等待(同步或其他方式)以流方式创建一个块和 post 个项目:
private static void Main()
{
MainAsync().Wait();
}
private static async Task MainAsync()
{
var block = new ActionBlock<string>(async item =>
{
Console.WriteLine(item.Substring(0, 10));
await Task.Delay(1000);
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
foreach (var item in GetSequence('A'))
{
await block.SendAsync(item);
}
block.Complete();
await block.Completion;
}
如果管道的输出比率低于 post 比率,消息将在管道上累积,直到内存耗尽或达到某个队列限制。 如果消息的大小很大,进程很快就会耗尽内存。
如果队列已经有一条消息,将BoundedCapacity
设置为1将导致消息被队列拒绝。例如,在批处理等情况下,这不是期望的行为。检查此 post 以获得见解。
这个工作测试说明了我的观点:
//Change BoundedCapacity to +1 to see it fail
[TestMethod]
public void Whosebug()
{
var total = 1000;
var processed = 0;
var block = new ActionBlock<int>(
(messageUnit) =>
{
Thread.Sleep(10);
Trace.WriteLine($"{messageUnit}");
processed++;
},
new ExecutionDataflowBlockOptions() { BoundedCapacity = -1 }
);
for (int i = 0; i < total; i++)
{
var result = block.SendAsync(i);
Assert.IsTrue(result.IsCompleted, $"failed for {i}");
}
block.Complete();
block.Completion.Wait();
Assert.AreEqual(total, processed);
}
所以我的方法是限制post,所以管道不会在队列中积累太多消息。
下面是一个简单的方法。 这样数据流会一直全速处理消息,但不会累积消息,这样可以避免过多的内存消耗。
//Should be adjusted for specific use.
public void postAssync(Message message)
{
while (totalPending = block1.InputCount + ... + blockn.InputCount> 100)
{
Thread.Sleep(200);
//Note: if allocating huge quantities for of memory for each message the Garbage collector may keep up with the pace.
//This is the perfect place to force garbage collector to release memory.
}
block1.SendAssync(message)
}
似乎要创建一个输出有界的TransformManyBlock
,需要三个内部块:
- A
TransformBlock
接收输入并产生IEnumerable
s,运行 可能并行。 - 一个非并行
ActionBlock
,枚举生成的IEnumerable
,并传播最终结果。 - A
BufferBlock
存储最终结果的地方,尊重理想的BoundedCapacity
。
稍微棘手的部分是如何传播第二个块的完成,因为它没有直接链接到第三个块。下面的实现中,方法PropagateCompletion
是根据库的source code写的
public static IPropagatorBlock<TInput, TOutput>
CreateOutputBoundedTransformManyBlock<TInput, TOutput>(
Func<TInput, Task<IEnumerable<TOutput>>> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions)
{
if (transform == null) throw new ArgumentNullException(nameof(transform));
if (dataflowBlockOptions == null)
throw new ArgumentNullException(nameof(dataflowBlockOptions));
var input = new TransformBlock<TInput, IEnumerable<TOutput>>(transform,
dataflowBlockOptions);
var output = new BufferBlock<TOutput>(dataflowBlockOptions);
var middle = new ActionBlock<IEnumerable<TOutput>>(async results =>
{
if (results == null) return;
foreach (var result in results)
{
var accepted = await output.SendAsync(result).ConfigureAwait(false);
if (!accepted) break; // If one is rejected, the rest will be rejected too
}
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = dataflowBlockOptions.MaxDegreeOfParallelism,
CancellationToken = dataflowBlockOptions.CancellationToken,
SingleProducerConstrained = true,
});
input.LinkTo(middle, new DataflowLinkOptions() { PropagateCompletion = true });
PropagateCompletion(middle, output);
return DataflowBlock.Encapsulate(input, output);
async void PropagateCompletion(IDataflowBlock source, IDataflowBlock target)
{
try
{
await source.Completion.ConfigureAwait(false);
}
catch { }
var exception = source.Completion.IsFaulted ? source.Completion.Exception : null;
if (exception != null) target.Fault(exception); else target.Complete();
}
}
// Overload with synchronous delegate
public static IPropagatorBlock<TInput, TOutput>
CreateOutputBoundedTransformManyBlock<TInput, TOutput>(
Func<TInput, IEnumerable<TOutput>> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions)
{
return CreateOutputBoundedTransformManyBlock<TInput, TOutput>(
item => Task.FromResult(transform(item)), dataflowBlockOptions);
}
用法示例:
var firstBlock = CreateOutputBoundedTransformManyBlock<char, string>(
c => GetSequence(c), options);