在并发数据处理期间如何避免 运行 内存不足?
How to avoid running out of RAM, during a concurrent data proccessing?
我对数据并发处理有疑问。我的 PC 运行RAM 很快就用完了。关于如何修复我的并发实现的任何建议?
常见 class:
public class CalculationResult
{
public int Count { get; set; }
public decimal[] RunningTotals { get; set; }
public CalculationResult(decimal[] profits)
{
this.Count = 1;
this.RunningTotals = new decimal[12];
profits.CopyTo(this.RunningTotals, 0);
}
public void Update(decimal[] newData)
{
this.Count++;
// summ arrays
for (int i = 0; i < 12; i++)
this.RunningTotals[i] = this.RunningTotals[i] + newData[i];
}
public void Update(CalculationResult otherResult)
{
this.Count += otherResult.Count;
// summ arrays
for (int i = 0; i < 12; i++)
this.RunningTotals[i] = this.RunningTotals[i] + otherResult.RunningTotals[i];
}
}
代码的单核实现如下:
Dictionary<string, CalculationResult> combinations = new Dictionary<string, CalculationResult>();
foreach (var i in itterations)
{
// do the processing
// ..
string combination = "1,2,3,4,42345,52,523"; // this is determined during the processing
if (combinations.ContainsKey(combination))
combinations[combination].Update(newData);
else
combinations.Add(combination, new CalculationResult(newData));
}
多核实现:
ConcurrentBag<Dictionary<string, CalculationResult>> results = new ConcurrentBag<Dictionary<string, CalculationResult>>();
Parallel.ForEach(itterations, (i, state) =>
{
Dictionary<string, CalculationResult> combinations = new Dictionary<string, CalculationResult>();
// do the processing
// ..
// add combination to combinations -> same logic as in single core implementation
results.Add(combinations);
});
Dictionary<string, CalculationResult> combinationsReal = new Dictionary<string, CalculationResult>();
foreach (var item in results)
{
foreach (var pair in item)
{
if (combinationsReal.ContainsKey(pair.Key))
combinationsReal[pair.Key].Update(pair.Value);
else
combinationsReal.Add(pair.Key, pair.Value);
}
}
我遇到的问题是几乎每个 combinations
字典都以 930k
条记录 结束,平均消耗 400 [MB]
内存内存。
现在,在单核实现中只有一个这样的字典。所有检查都是针对一本字典执行的。但这是一种缓慢的方法,我想使用多核优化。
在多核实现中,创建了一个 ConcurrentBag
实例,其中包含所有 combinations
字典。一旦多线程作业完成 - 所有字典都聚合为一个。这种方法适用于少量并发迭代。例如,对于 4 次迭代,我的 RAM 用法是 ~ 1.5 [GB]
。问题出现了,当我设置并行迭代的全部数量时,即 200!再多的PCRAM
也装不下所有的词典,每部都有百万条记录!
我一直在考虑使用 ConcurrentDictioanary
,直到我发现 "TryAdd" 方法在我的情况下无法保证添加数据的完整性,因为我还需要运行 更新 运行 总计。
唯一真正的多线程选项是将它们保存到某个数据库,而不是将所有 combinations
添加到字典中。然后,数据聚合将成为带有 group by
子句的 1 SQL select
语句的问题......但我不喜欢创建临时 table 和 运行宁数据库实例就是为了那个..
是否有关于如何同时处理数据而不是 运行 内存不足的变通方法?
编辑:
也许真正的问题应该是——如何在使用 ConcurrentDictionary
时使 RunningTotals
的更新线程安全?我在这个 thread 中只有 运行,与 ConcurrentDictionary
有类似的问题,但我的情况似乎更复杂,因为我有一个数组需要更新。我还在调查这件事。
EDIT2: 这是 ConcurrentDictionary
的有效解决方案。我需要做的就是为字典键添加一把锁。
ConcurrentDictionary<string, CalculationResult> combinations = new ConcurrentDictionary<string, CalculationResult>();
Parallel.ForEach(itterations, (i, state) =>
{
// do the processing
// ..
string combination = "1,2,3,4,42345,52,523"; // this is determined during the processing
if (combinations.ContainsKey(combination)) {
lock(combinations[combination])
combinations[combination].Update(newData);
}
else
combinations.TryAdd(combination, new CalculationResult(newData));
});
单线程代码执行时间为 1m 48s
,而此解决方案执行时间为 1m 7s
4 次迭代(性能提高 37%)。我仍然想知道 SQL 方法是否会更快,有数百万条记录?我可能明天会测试它并更新。
编辑 3: 对于那些想知道 ConcurrentDictionary
值更新有什么问题的人 - 运行 此代码有锁和无锁。
public class Result
{
public int Count { get; set; }
}
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Start");
List<int> keys = new List<int>();
for (int i = 0; i < 100; i++)
keys.Add(i);
ConcurrentDictionary<int, Result> dict = new ConcurrentDictionary<int, Result>();
Parallel.For(0, 8, i =>
{
foreach(var key in keys)
{
if (dict.ContainsKey(key))
{
//lock (dict[key]) // uncomment this
dict[key].Count++;
}
else
dict.TryAdd(key, new Result());
}
});
// any output here is incorrect behavior. best result = no lines
foreach (var item in dict)
if (item.Value.Count != 7) { Console.WriteLine($"{item.Key}; {item.Value.Count}"); }
Console.WriteLine($"Finish");
Console.ReadKey();
}
}
编辑 4: 经过反复试验,我无法优化 SQL 方法。结果证明这是最糟糕的主意 :) 我使用了 SQL Lite
数据库。内存中和文件中。使用 t运行saction 和可重复使用的 SQL 命令参数。由于需要插入大量记录 - 性能不足。数据聚合是最简单的部分,但仅插入 400 万行就需要大量时间,我什至无法想象如何有效地处理 2.4 亿数据.. 到目前为止(还有 st运行gely), ConcurrentBag
方法在我的电脑上似乎是最快的。其次是 ConcurrentDictionary
方法。不过,ConcurrentBag
的内存有点重。感谢 @Alisson 的工作 - 现在可以将它用于更大的迭代集!
所以,您只需要确保您的并发迭代不超过 4 个,这是您的计算机资源的限制,仅使用这台计算机,没有什么神奇之处。
我创建了一个class来控制并发执行以及它将执行的并发任务数。
class 将拥有这些属性:
public class ConcurrentCalculationProcessor
{
private const int MAX_CONCURRENT_TASKS = 4;
private readonly IEnumerable<int> _codes;
private readonly List<Task<Dictionary<string, CalculationResult>>> _tasks;
private readonly Dictionary<string, CalculationResult> _combinationsReal;
public ConcurrentCalculationProcessor(IEnumerable<int> codes)
{
this._codes = codes;
this._tasks = new List<Task<Dictionary<string, CalculationResult>>>();
this._combinationsReal = new Dictionary<string, CalculationResult>();
}
}
我把并发任务数设为const
,但是可能是构造函数中的一个参数
我创建了一个方法来处理这个过程。出于测试目的,我模拟了一个通过 900k itens 的循环,将它们添加到字典中,最后返回它们:
private async Task<Dictionary<string, CalculationResult>> ProcessCombinations()
{
Dictionary<string, CalculationResult> combinations = new Dictionary<string, CalculationResult>();
// do the processing
// here we should do something that worth using concurrency
// like querying databases, consuming APIs/WebServices, and other I/O stuff
for (int i = 0; i < 950000; i++)
combinations[i.ToString()] = new CalculationResult(new decimal[] { 1, 10, 15 });
return await Task.FromResult(combinations);
}
main 方法将并行启动任务,将它们添加到任务列表中,因此我们可以最近跟踪它们。
每次列表达到最大并发任务时,我们await
一个方法叫做ProcessRealCombinations
。
public async Task<Dictionary<string, CalculationResult>> Execute()
{
ConcurrentBag<Dictionary<string, CalculationResult>> results = new ConcurrentBag<Dictionary<string, CalculationResult>>();
for (int i = 0; i < this._codes.Count(); i++)
{
// start the task imediately
var task = ProcessCombinations();
this._tasks.Add(task);
if (this._tasks.Count() >= MAX_CONCURRENT_TASKS)
{
// if we have more than MAX_CONCURRENT_TASKS in progress, we start processing some of them
// this will await any of the current tasks to complete, them process it (and any other task which may have been completed as well)...
await ProcessCompletedTasks().ConfigureAwait(false);
}
}
// keep processing until all the pending tasks have been completed...it should be no more than MAX_CONCURRENT_TASKS
while(this._tasks.Any())
await ProcessCompletedTasks().ConfigureAwait(false);
return this._combinationsReal;
}
下一个方法ProcessCompletedTasks
将等待至少一个现有任务完成。之后,它将从列表中获取所有已完成的任务(完成的任务和可能一起完成的任何其他任务),并获得它们的结果(组合)。
对于每个 processedCombinations
,它将与 this._combinationsReal
合并(使用您在问题中提供的相同逻辑)。
private async Task ProcessCompletedTasks()
{
await Task.WhenAny(this._tasks).ConfigureAwait(false);
var completedTasks = this._tasks.Where(t => t.IsCompleted).ToArray();
// completedTasks will have at least one task, but it may have more ;)
foreach (var completedTask in completedTasks)
{
var processedCombinations = await completedTask.ConfigureAwait(false);
foreach (var pair in processedCombinations)
{
if (this._combinationsReal.ContainsKey(pair.Key))
this._combinationsReal[pair.Key].Update(pair.Value);
else
this._combinationsReal.Add(pair.Key, pair.Value);
}
this._tasks.Remove(completedTask);
}
}
对于每个 processedCombinations
合并到 _combinationsReal
,它将从列表中删除其各自的任务,然后继续(再次开始添加更多任务)。这将发生,直到我们为所有迭代创建了所有任务。
最后,我们继续处理它,直到列表中没有更多任务。
如果您监控 RAM 消耗,您会注意到它会增加到大约 1.5 GB(当我们同时处理 4 个任务时),然后减少到大约 0.8 GB(当我们从列表中删除任务时)。至少我的电脑是这样的。
这是一个 fiddle,但是我不得不将 itens 的数量从 900k 减少到 100,因为 fiddle 限制内存使用以避免滥用。
希望对你有所帮助。
关于所有这些东西要注意的一件事是,如果您的 ProcessCombinations
(处理那些 900k 项目时同时执行的方法)调用 ,您将受益于使用并发任务外部资源,例如从硬盘读取文件、在数据库中执行查询、调用API/WebService 方法。我猜该代码是可能从外部资源读取 900k 项,然后这将减少处理它所需的时间。
如果项目之前已加载并且 ProcessCombinations
只是读取 已经在内存中的数据 ,那么并发性 将无济于事 (实际上我相信这会使您的代码 运行 变慢)。如果是这样,那么我们在错误的地方应用了并发。
当所述调用要访问外部资源(获取或存储数据)时,使用 async
并行调用可能会有更多帮助,并且取决于外部资源可以支持多少个并发调用,它可能仍然没有什么不同。
我对数据并发处理有疑问。我的 PC 运行RAM 很快就用完了。关于如何修复我的并发实现的任何建议?
常见 class:
public class CalculationResult
{
public int Count { get; set; }
public decimal[] RunningTotals { get; set; }
public CalculationResult(decimal[] profits)
{
this.Count = 1;
this.RunningTotals = new decimal[12];
profits.CopyTo(this.RunningTotals, 0);
}
public void Update(decimal[] newData)
{
this.Count++;
// summ arrays
for (int i = 0; i < 12; i++)
this.RunningTotals[i] = this.RunningTotals[i] + newData[i];
}
public void Update(CalculationResult otherResult)
{
this.Count += otherResult.Count;
// summ arrays
for (int i = 0; i < 12; i++)
this.RunningTotals[i] = this.RunningTotals[i] + otherResult.RunningTotals[i];
}
}
代码的单核实现如下:
Dictionary<string, CalculationResult> combinations = new Dictionary<string, CalculationResult>();
foreach (var i in itterations)
{
// do the processing
// ..
string combination = "1,2,3,4,42345,52,523"; // this is determined during the processing
if (combinations.ContainsKey(combination))
combinations[combination].Update(newData);
else
combinations.Add(combination, new CalculationResult(newData));
}
多核实现:
ConcurrentBag<Dictionary<string, CalculationResult>> results = new ConcurrentBag<Dictionary<string, CalculationResult>>();
Parallel.ForEach(itterations, (i, state) =>
{
Dictionary<string, CalculationResult> combinations = new Dictionary<string, CalculationResult>();
// do the processing
// ..
// add combination to combinations -> same logic as in single core implementation
results.Add(combinations);
});
Dictionary<string, CalculationResult> combinationsReal = new Dictionary<string, CalculationResult>();
foreach (var item in results)
{
foreach (var pair in item)
{
if (combinationsReal.ContainsKey(pair.Key))
combinationsReal[pair.Key].Update(pair.Value);
else
combinationsReal.Add(pair.Key, pair.Value);
}
}
我遇到的问题是几乎每个 combinations
字典都以 930k
条记录 结束,平均消耗 400 [MB]
内存内存。
现在,在单核实现中只有一个这样的字典。所有检查都是针对一本字典执行的。但这是一种缓慢的方法,我想使用多核优化。
在多核实现中,创建了一个 ConcurrentBag
实例,其中包含所有 combinations
字典。一旦多线程作业完成 - 所有字典都聚合为一个。这种方法适用于少量并发迭代。例如,对于 4 次迭代,我的 RAM 用法是 ~ 1.5 [GB]
。问题出现了,当我设置并行迭代的全部数量时,即 200!再多的PCRAM
也装不下所有的词典,每部都有百万条记录!
我一直在考虑使用 ConcurrentDictioanary
,直到我发现 "TryAdd" 方法在我的情况下无法保证添加数据的完整性,因为我还需要运行 更新 运行 总计。
唯一真正的多线程选项是将它们保存到某个数据库,而不是将所有 combinations
添加到字典中。然后,数据聚合将成为带有 group by
子句的 1 SQL select
语句的问题......但我不喜欢创建临时 table 和 运行宁数据库实例就是为了那个..
是否有关于如何同时处理数据而不是 运行 内存不足的变通方法?
编辑:
也许真正的问题应该是——如何在使用 ConcurrentDictionary
时使 RunningTotals
的更新线程安全?我在这个 thread 中只有 运行,与 ConcurrentDictionary
有类似的问题,但我的情况似乎更复杂,因为我有一个数组需要更新。我还在调查这件事。
EDIT2: 这是 ConcurrentDictionary
的有效解决方案。我需要做的就是为字典键添加一把锁。
ConcurrentDictionary<string, CalculationResult> combinations = new ConcurrentDictionary<string, CalculationResult>();
Parallel.ForEach(itterations, (i, state) =>
{
// do the processing
// ..
string combination = "1,2,3,4,42345,52,523"; // this is determined during the processing
if (combinations.ContainsKey(combination)) {
lock(combinations[combination])
combinations[combination].Update(newData);
}
else
combinations.TryAdd(combination, new CalculationResult(newData));
});
单线程代码执行时间为 1m 48s
,而此解决方案执行时间为 1m 7s
4 次迭代(性能提高 37%)。我仍然想知道 SQL 方法是否会更快,有数百万条记录?我可能明天会测试它并更新。
编辑 3: 对于那些想知道 ConcurrentDictionary
值更新有什么问题的人 - 运行 此代码有锁和无锁。
public class Result
{
public int Count { get; set; }
}
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Start");
List<int> keys = new List<int>();
for (int i = 0; i < 100; i++)
keys.Add(i);
ConcurrentDictionary<int, Result> dict = new ConcurrentDictionary<int, Result>();
Parallel.For(0, 8, i =>
{
foreach(var key in keys)
{
if (dict.ContainsKey(key))
{
//lock (dict[key]) // uncomment this
dict[key].Count++;
}
else
dict.TryAdd(key, new Result());
}
});
// any output here is incorrect behavior. best result = no lines
foreach (var item in dict)
if (item.Value.Count != 7) { Console.WriteLine($"{item.Key}; {item.Value.Count}"); }
Console.WriteLine($"Finish");
Console.ReadKey();
}
}
编辑 4: 经过反复试验,我无法优化 SQL 方法。结果证明这是最糟糕的主意 :) 我使用了 SQL Lite
数据库。内存中和文件中。使用 t运行saction 和可重复使用的 SQL 命令参数。由于需要插入大量记录 - 性能不足。数据聚合是最简单的部分,但仅插入 400 万行就需要大量时间,我什至无法想象如何有效地处理 2.4 亿数据.. 到目前为止(还有 st运行gely), ConcurrentBag
方法在我的电脑上似乎是最快的。其次是 ConcurrentDictionary
方法。不过,ConcurrentBag
的内存有点重。感谢 @Alisson 的工作 - 现在可以将它用于更大的迭代集!
所以,您只需要确保您的并发迭代不超过 4 个,这是您的计算机资源的限制,仅使用这台计算机,没有什么神奇之处。
我创建了一个class来控制并发执行以及它将执行的并发任务数。
class 将拥有这些属性:
public class ConcurrentCalculationProcessor
{
private const int MAX_CONCURRENT_TASKS = 4;
private readonly IEnumerable<int> _codes;
private readonly List<Task<Dictionary<string, CalculationResult>>> _tasks;
private readonly Dictionary<string, CalculationResult> _combinationsReal;
public ConcurrentCalculationProcessor(IEnumerable<int> codes)
{
this._codes = codes;
this._tasks = new List<Task<Dictionary<string, CalculationResult>>>();
this._combinationsReal = new Dictionary<string, CalculationResult>();
}
}
我把并发任务数设为const
,但是可能是构造函数中的一个参数
我创建了一个方法来处理这个过程。出于测试目的,我模拟了一个通过 900k itens 的循环,将它们添加到字典中,最后返回它们:
private async Task<Dictionary<string, CalculationResult>> ProcessCombinations()
{
Dictionary<string, CalculationResult> combinations = new Dictionary<string, CalculationResult>();
// do the processing
// here we should do something that worth using concurrency
// like querying databases, consuming APIs/WebServices, and other I/O stuff
for (int i = 0; i < 950000; i++)
combinations[i.ToString()] = new CalculationResult(new decimal[] { 1, 10, 15 });
return await Task.FromResult(combinations);
}
main 方法将并行启动任务,将它们添加到任务列表中,因此我们可以最近跟踪它们。
每次列表达到最大并发任务时,我们await
一个方法叫做ProcessRealCombinations
。
public async Task<Dictionary<string, CalculationResult>> Execute()
{
ConcurrentBag<Dictionary<string, CalculationResult>> results = new ConcurrentBag<Dictionary<string, CalculationResult>>();
for (int i = 0; i < this._codes.Count(); i++)
{
// start the task imediately
var task = ProcessCombinations();
this._tasks.Add(task);
if (this._tasks.Count() >= MAX_CONCURRENT_TASKS)
{
// if we have more than MAX_CONCURRENT_TASKS in progress, we start processing some of them
// this will await any of the current tasks to complete, them process it (and any other task which may have been completed as well)...
await ProcessCompletedTasks().ConfigureAwait(false);
}
}
// keep processing until all the pending tasks have been completed...it should be no more than MAX_CONCURRENT_TASKS
while(this._tasks.Any())
await ProcessCompletedTasks().ConfigureAwait(false);
return this._combinationsReal;
}
下一个方法ProcessCompletedTasks
将等待至少一个现有任务完成。之后,它将从列表中获取所有已完成的任务(完成的任务和可能一起完成的任何其他任务),并获得它们的结果(组合)。
对于每个 processedCombinations
,它将与 this._combinationsReal
合并(使用您在问题中提供的相同逻辑)。
private async Task ProcessCompletedTasks()
{
await Task.WhenAny(this._tasks).ConfigureAwait(false);
var completedTasks = this._tasks.Where(t => t.IsCompleted).ToArray();
// completedTasks will have at least one task, but it may have more ;)
foreach (var completedTask in completedTasks)
{
var processedCombinations = await completedTask.ConfigureAwait(false);
foreach (var pair in processedCombinations)
{
if (this._combinationsReal.ContainsKey(pair.Key))
this._combinationsReal[pair.Key].Update(pair.Value);
else
this._combinationsReal.Add(pair.Key, pair.Value);
}
this._tasks.Remove(completedTask);
}
}
对于每个 processedCombinations
合并到 _combinationsReal
,它将从列表中删除其各自的任务,然后继续(再次开始添加更多任务)。这将发生,直到我们为所有迭代创建了所有任务。
最后,我们继续处理它,直到列表中没有更多任务。
如果您监控 RAM 消耗,您会注意到它会增加到大约 1.5 GB(当我们同时处理 4 个任务时),然后减少到大约 0.8 GB(当我们从列表中删除任务时)。至少我的电脑是这样的。
这是一个 fiddle,但是我不得不将 itens 的数量从 900k 减少到 100,因为 fiddle 限制内存使用以避免滥用。
希望对你有所帮助。
关于所有这些东西要注意的一件事是,如果您的 ProcessCombinations
(处理那些 900k 项目时同时执行的方法)调用 ,您将受益于使用并发任务外部资源,例如从硬盘读取文件、在数据库中执行查询、调用API/WebService 方法。我猜该代码是可能从外部资源读取 900k 项,然后这将减少处理它所需的时间。
如果项目之前已加载并且 ProcessCombinations
只是读取 已经在内存中的数据 ,那么并发性 将无济于事 (实际上我相信这会使您的代码 运行 变慢)。如果是这样,那么我们在错误的地方应用了并发。
当所述调用要访问外部资源(获取或存储数据)时,使用 async
并行调用可能会有更多帮助,并且取决于外部资源可以支持多少个并发调用,它可能仍然没有什么不同。