在并发数据处理期间如何避免 运行 内存不足?

How to avoid running out of RAM, during a concurrent data proccessing?

我对数据并发处理有疑问。我的 PC 运行RAM 很快就用完了。关于如何修复我的并发实现的任何建议?

常见 class:

public class CalculationResult
{
    public int Count { get; set; }
    public decimal[] RunningTotals { get; set; }

    public CalculationResult(decimal[] profits)
    {
        this.Count = 1;
        this.RunningTotals = new decimal[12];
        profits.CopyTo(this.RunningTotals, 0);
    }

    public void Update(decimal[] newData)
    {
        this.Count++;

        // summ arrays
        for (int i = 0; i < 12; i++)
            this.RunningTotals[i] = this.RunningTotals[i] + newData[i];
    }

    public void Update(CalculationResult otherResult)
    {
        this.Count += otherResult.Count;

        // summ arrays
        for (int i = 0; i < 12; i++)
            this.RunningTotals[i] = this.RunningTotals[i] + otherResult.RunningTotals[i];
    }
}

代码的单核实现如下:

Dictionary<string, CalculationResult> combinations = new Dictionary<string, CalculationResult>();
foreach (var i in itterations)
{
    // do the processing
    // ..
    string combination = "1,2,3,4,42345,52,523"; // this is determined during the processing

    if (combinations.ContainsKey(combination))
        combinations[combination].Update(newData);
    else
        combinations.Add(combination, new CalculationResult(newData));
}

多核实现:

ConcurrentBag<Dictionary<string, CalculationResult>> results = new ConcurrentBag<Dictionary<string, CalculationResult>>();
Parallel.ForEach(itterations, (i, state) => 
{
    Dictionary<string, CalculationResult> combinations = new Dictionary<string, CalculationResult>();
    // do the processing
    // ..
    // add combination to combinations -> same logic as in single core implementation
    results.Add(combinations);
});
Dictionary<string, CalculationResult> combinationsReal = new Dictionary<string, CalculationResult>();
foreach (var item in results)
{
    foreach (var pair in item)
    {
        if (combinationsReal.ContainsKey(pair.Key))
            combinationsReal[pair.Key].Update(pair.Value);
        else
            combinationsReal.Add(pair.Key, pair.Value);
    }
}

我遇到的问题是几乎每个 combinations 字典都以 930k 条记录 结束,平均消耗 400 [MB]内存内存。

现在,在单核实现中只有一个这样的字典。所有检查都是针对一本字典执行的。但这是一种缓慢的方法,我想使用多核优化。

在多核实现中,创建了一个 ConcurrentBag 实例,其中包含所有 combinations 字典。一旦多线程作业完成 - 所有字典都聚合为一个。这种方法适用于少量并发迭代。例如,对于 4 次迭代,我的 RAM 用法是 ~ 1.5 [GB]。问题出现了,当我设置并行迭代的全部数量时,即 200!再多的PCRAM也装不下所有的词典,每部都有百万条记录!

我一直在考虑使用 ConcurrentDictioanary,直到我发现 "TryAdd" 方法在我的情况下无法保证添加数据的完整性,因为我还需要运行 更新 运行 总计。

唯一真正的多线程选项是将它们保存到某个数据库,而不是将所有 combinations 添加到字典中。然后,数据聚合将成为带有 group by 子句的 1 SQL select 语句的问题......但我不喜欢创建临时 table 和 运行宁数据库实例就是为了那个..

是否有关于如何同时处理数据而不是 运行 内存不足的变通方法?

编辑: 也许真正的问题应该是——如何在使用 ConcurrentDictionary 时使 RunningTotals 的更新线程安全?我在这个 thread 中只有 运行,与 ConcurrentDictionary 有类似的问题,但我的情况似乎更复杂,因为我有一个数组需要更新。我还在调查这件事。

EDIT2: 这是 ConcurrentDictionary 的有效解决方案。我需要做的就是为字典键添加一把锁。

ConcurrentDictionary<string, CalculationResult> combinations = new ConcurrentDictionary<string, CalculationResult>();
Parallel.ForEach(itterations, (i, state) => 
{
    // do the processing
    // ..
    string combination = "1,2,3,4,42345,52,523"; // this is determined during the processing

    if (combinations.ContainsKey(combination)) {
        lock(combinations[combination])
            combinations[combination].Update(newData);
    }
    else
        combinations.TryAdd(combination, new CalculationResult(newData));
});

单线程代码执行时间为 1m 48s,而此解决方案执行时间为 1m 7s 4 次迭代(性能提高 37%)。我仍然想知道 SQL 方法是否会更快,有数百万条记录?我可能明天会测试它并更新。

编辑 3: 对于那些想知道 ConcurrentDictionary 值更新有什么问题的人 - 运行 此代码有锁和无锁。

public class Result
{
    public int Count { get; set; }
}

class Program
{
    static void Main(string[] args)
    {
        Console.WriteLine("Start");

        List<int> keys = new List<int>();
        for (int i = 0; i < 100; i++)
            keys.Add(i);

        ConcurrentDictionary<int, Result> dict = new ConcurrentDictionary<int, Result>();
        Parallel.For(0, 8, i =>
        {
            foreach(var key in keys)
            {
                if (dict.ContainsKey(key))
                {
                    //lock (dict[key]) // uncomment this
                        dict[key].Count++;
                }
                else
                    dict.TryAdd(key, new Result());
            }
        });

        // any output here is incorrect behavior. best result = no lines
        foreach (var item in dict)
            if (item.Value.Count != 7) { Console.WriteLine($"{item.Key}; {item.Value.Count}"); }

        Console.WriteLine($"Finish");
        Console.ReadKey();
    }
}

编辑 4: 经过反复试验,我无法优化 SQL 方法。结果证明这是最糟糕的主意 :) 我使用了 SQL Lite 数据库。内存中和文件中。使用 t运行saction 和可重复使用的 SQL 命令参数。由于需要插入大量记录 -​​ 性能不足。数据聚合是最简单的部分,但仅插入 400 万行就需要大量时间,我什至无法想象如何有效地处理 2.4 亿数据.. 到目前为止(还有 ​​st运行gely), ConcurrentBag 方法在我的电脑上似乎是最快的。其次是 ConcurrentDictionary 方法。不过,ConcurrentBag 的内存有点重。感谢 @Alisson 的工作 - 现在可以将它用于更大的迭代集!

所以,您只需要确保您的并发迭代不超过 4 个,这是您的计算机资源的限制,仅使用这台计算机,没有什么神奇之处。

我创建了一个class来控制并发执行以及它将执行的并发任务数。

class 将拥有这些属性:

public class ConcurrentCalculationProcessor
{
    private const int MAX_CONCURRENT_TASKS = 4;
    private readonly IEnumerable<int> _codes;
    private readonly List<Task<Dictionary<string, CalculationResult>>> _tasks;
    private readonly Dictionary<string, CalculationResult> _combinationsReal;

    public ConcurrentCalculationProcessor(IEnumerable<int> codes)
    {
        this._codes = codes;
        this._tasks = new List<Task<Dictionary<string, CalculationResult>>>();
        this._combinationsReal = new Dictionary<string, CalculationResult>();
    }
}

我把并发任务数设为const,但是可能是构造函数中的一个参数

我创建了一个方法来处理这个过程。出于测试目的,我模拟了一个通过 900k itens 的循环,将它们添加到字典中,最后返回它们:

private async Task<Dictionary<string, CalculationResult>> ProcessCombinations()
{
    Dictionary<string, CalculationResult> combinations = new Dictionary<string, CalculationResult>();
    // do the processing
    // here we should do something that worth using concurrency
    // like querying databases, consuming APIs/WebServices, and other I/O stuff
    for (int i = 0; i < 950000; i++)
        combinations[i.ToString()] = new CalculationResult(new decimal[] { 1, 10, 15 });
    return await Task.FromResult(combinations);
}

main 方法将并行启动任务,将它们添加到任务列表中,因此我们可以最近跟踪它们。

每次列表达到最大并发任务时,我们await一个方法叫做ProcessRealCombinations

public async Task<Dictionary<string, CalculationResult>> Execute()
{
    ConcurrentBag<Dictionary<string, CalculationResult>> results = new ConcurrentBag<Dictionary<string, CalculationResult>>();

    for (int i = 0; i < this._codes.Count(); i++)
    {
        // start the task imediately
        var task = ProcessCombinations();
        this._tasks.Add(task);
        if (this._tasks.Count() >= MAX_CONCURRENT_TASKS)
        {
            // if we have more than MAX_CONCURRENT_TASKS in progress, we start processing some of them
            // this will await any of the current tasks to complete, them process it (and any other task which may have been completed as well)...
            await ProcessCompletedTasks().ConfigureAwait(false);
        }
    }

    // keep processing until all the pending tasks have been completed...it should be no more than MAX_CONCURRENT_TASKS
    while(this._tasks.Any())
        await ProcessCompletedTasks().ConfigureAwait(false);

    return this._combinationsReal;
}

下一个方法ProcessCompletedTasks将等待至少一个现有任务完成。之后,它将从列表中获取所有已完成的任务(完成的任务和可能一起完成的任何其他任务),并获得它们的结果(组合)。

对于每个 processedCombinations,它将与 this._combinationsReal 合并(使用您在问题中提供的相同逻辑)。

private async Task ProcessCompletedTasks()
{
    await Task.WhenAny(this._tasks).ConfigureAwait(false);
    var completedTasks = this._tasks.Where(t => t.IsCompleted).ToArray();
    // completedTasks will have at least one task, but it may have more ;)
    foreach (var completedTask in completedTasks)
    {
        var processedCombinations = await completedTask.ConfigureAwait(false);
        foreach (var pair in processedCombinations)
        {
            if (this._combinationsReal.ContainsKey(pair.Key))
                this._combinationsReal[pair.Key].Update(pair.Value);
            else
                this._combinationsReal.Add(pair.Key, pair.Value);
        }
        this._tasks.Remove(completedTask);
    }
}

对于每个 processedCombinations 合并到 _combinationsReal,它将从列表中删除其各自的任务,然后继续(再次开始添加更多任务)。这将发生,直到我们为所有迭代创建了所有任务。

最后,我们继续处理它,直到列表中没有更多任务。


如果您监控 RAM 消耗,您会注意到它会增加到大约 1.5 GB(当我们同时处理 4 个任务时),然后减少到大约 0.8 GB(当我们从列表中删除任务时)。至少我的电脑是这样的。

这是一个 fiddle,但是我不得不将 itens 的数量从 900k 减少到 100,因为 fiddle 限制内存使用以避免滥用。

希望对你有所帮助。


关于所有这些东西要注意的一件事是,如果您的 ProcessCombinations(处理那些 900k 项目时同时执行的方法)调用 ,您将受益于使用并发任务外部资源,例如从硬盘读取文件、在数据库中执行查询、调用API/WebService 方法。我猜该代码是可能从外部资源读取 900k 项,然后这将减少处理它所需的时间。

如果项目之前已加载并且 ProcessCombinations 只是读取 已经在内存中的数据 ,那么并发性 将无济于事 (实际上我相信这会使您的代码 运行 变慢)。如果是这样,那么我们在错误的地方应用了并发。

当所述调用要访问外部资源(获取或存储数据)时,使用 async 并行调用可能会有更多帮助,并且取决于外部资源可以支持多少个并发调用,它可能仍然没有什么不同。