在 Parallel.For 循环中捕获类似变量的错误

Captured variable-like error in Parallel.For loop

以下代码部分工作正常:

            Parallel.For(
                0, numberOfRunsNeeded, j => 
                {
                    var copyOfj = j;
                         
                    var researchItems = viewModel.ResearchItems[queryNumber].GetRange((int)(copyOfj * itemsAtOnce), Math.Min(itemsAtOnce, viewModel.ResearchItems[queryNumber].Count - (copyOfj * itemsAtOnce)));

                    var finalQuery = GetCorrectedQuery(query.BaseQuery, query.SQLVariants[copyOfi]);
                    if (researchItems.Count > 0)
                    {
                        finalQuery = GetCorrectedQueryWithResearchItems(finalQuery, researchItems, query.SQLVariants[copyOfi]);
                    }

                    PerformSingleRun(query, copyOfi, dataSource, finalQuery, copyOfj, viewModel);
                }
            );

此更新后的代码显示捕获的类似变量的错误 - finalQuery 返回就像 For 循环多次重复相同的值:

            Parallel.For(
                0, numberOfRunsNeeded, parallelOptionsWithMaxDegreeOfParallelism, j =>
                {
                    var copyOfj = j;

                    if (researchItemsPresent)
                    {
                        var researchItems = ViewModel.ResearchItems[queryNumber].GetRange(copyOfj * itemsAtOnce, Math.Min(itemsAtOnce, ViewModel.ResearchItems[queryNumber].Count - (copyOfj * itemsAtOnce)));
                        finalQuery = GetAdaptedBaseQueryWithResearchItemsInserted(finalQuery, researchItems, query.SQLVariants[copyOfi]);
                    }

                    PerformSingleRun(query, copyOfi, dataSource, finalQuery, copyOfj, viewModel);
                }
            );

如上所述,我已经有了工作代码——我只是想了解我在修订中做错了什么。完整方法如下:


以前,工作:

    public static void ProcessSingleQuery(int queryNumber, ViewModel viewModel)
    {
        var query = new Query
        {
            Name = viewModel.QueryNames[queryNumber],
            BaseQuery = viewModel.BaseQueries[queryNumber],
            SelectedDatabases = viewModel.SelectedDatabases[queryNumber],
            SQLVariants = viewModel.SQLVariants[queryNumber],
            Usernames = viewModel.Usernames[queryNumber],
            Passwords = viewModel.Passwords[queryNumber],
            CSVFiles = viewModel.CSVFiles[queryNumber],
            CSVFileAliases = viewModel.CSVFileAliases[queryNumber],
            ColumnDelimiters = viewModel.ColumnDelimiters[queryNumber],
            HeaderRowsPresent = viewModel.HeaderRowsPresent[queryNumber],
            TextDelimiters = viewModel.TextDelimiters[queryNumber],
            ResearchItemColumnNumber = viewModel.ResearchItemColumnNumber[queryNumber]
        };


        for (var i = 0; i < query.SelectedDatabases.Count; i++)
        {
            var dataSource = GetDataSource(query.SelectedDatabases[i]);
            var itemsAtOnce = ViewModel.ItemsAtOnceBySQLVariant[query.SQLVariants[i]];
            if (query.SelectedDatabases[i].Equals("CSV"))
            {
                RefreshOrCreateSchemaIniFile(query);
                dataSource = query.CSVFiles[0].DirectoryName;
            }


            var numberOfRunsNeeded = Math.Max(
                (int)Math.Ceiling((double)viewModel.ResearchItems[queryNumber].Count / itemsAtOnce), 1
            );

            viewModel.QueryRunsCompletedMaximum += numberOfRunsNeeded;

            var copyOfi = i;

            Parallel.For(
                0, numberOfRunsNeeded, j => 
                {
                    var copyOfj = j;
                         
                    var researchItems = viewModel.ResearchItems[queryNumber].GetRange((int)(copyOfj * itemsAtOnce), Math.Min(itemsAtOnce, viewModel.ResearchItems[queryNumber].Count - (copyOfj * itemsAtOnce)));

                    var finalQuery = GetCorrectedQuery(query.BaseQuery, query.SQLVariants[copyOfi]);
                    if (researchItems.Count > 0)
                    {
                        finalQuery = GetCorrectedQueryWithResearchItems(finalQuery, researchItems, query.SQLVariants[copyOfi]);
                    }

                    PerformSingleRun(query, copyOfi, dataSource, finalQuery, copyOfj, viewModel);
                }
            );
        }

        GeneralTools.CombineAndDeleteQueryResults(Environment.GetFolderPath(Environment.SpecialFolder.Desktop), query.Name);
        if (query.ResearchItemColumnNumber != 0)
        {
            CompileMissingItemsReport(Environment.GetFolderPath(Environment.SpecialFolder.Desktop), query.Name, viewModel, queryNumber);
        }
    }

已修改,已损坏:

    public static void ProcessSingleQuery(int queryNumber, ViewModel viewModel)
    {
        var query = new Query
        {
            Name = ViewModel.QueryNames[queryNumber],
            BaseQuery = ViewModel.BaseQueries[queryNumber],
            SelectedDatabases = ViewModel.SelectedDatabases[queryNumber],
            SQLVariants = ViewModel.SQLVariants[queryNumber],
            Usernames = ViewModel.Usernames[queryNumber],
            Passwords = ViewModel.Passwords[queryNumber],
            CSVFiles = ViewModel.CSVFiles[queryNumber],
            CSVFileAliases = ViewModel.CSVFileAliases[queryNumber],
            ColumnDelimiters = ViewModel.ColumnDelimiters[queryNumber],
            HeaderRowsPresent = ViewModel.HeaderRowsPresent[queryNumber],
            TextDelimiters = ViewModel.TextDelimiters[queryNumber],
            ResearchItemColumnNumber = ViewModel.ResearchItemColumnNumber[queryNumber]
        };


        for (var i = 0; i < query.SelectedDatabases.Count; i++)
        {
            var finalQuery = GetAdaptedBaseQuery(query, query.SQLVariants[i]);
            var dataSource = GetDataSource(query.SelectedDatabases[i]);
            var itemsAtOnce = ViewModel.ItemsAtOnceBySQLVariant[query.SQLVariants[i]];

            if (query.SelectedDatabases[i].Contains("CSV"))
            {
                CreateSchemaIniFile(query);
                dataSource = query.CSVFiles[0].DirectoryName;
            }

            var researchItemsPresent = ViewModel.ResearchItems[queryNumber].Count > 0;
            var numberOfRunsNeeded = Math.Max(
                (int)Math.Ceiling((double)ViewModel.ResearchItems[queryNumber].Count / itemsAtOnce), 1
            );

            viewModel.QueryRunsCompletedMaximum += numberOfRunsNeeded;

            var copyOfi = i;

            var parallelOptionsWithMaxDegreeOfParallelism = new ParallelOptions
            {
                MaxDegreeOfParallelism =
                query.SQLVariants[i] == SQLVariant.Teradata ? 6 : -1
            };

            Parallel.For(
                0, numberOfRunsNeeded, parallelOptionsWithMaxDegreeOfParallelism, j =>
                {
                    var copyOfj = j;

                    if (researchItemsPresent)
                    {
                        var researchItems = ViewModel.ResearchItems[queryNumber].GetRange(copyOfj * itemsAtOnce, Math.Min(itemsAtOnce, ViewModel.ResearchItems[queryNumber].Count - (copyOfj * itemsAtOnce)));
                        finalQuery = GetAdaptedBaseQueryWithResearchItemsInserted(finalQuery, researchItems, query.SQLVariants[copyOfi]);
                    }

                    PerformSingleRun(query, copyOfi, dataSource, finalQuery, copyOfj, viewModel);
                }
            );
        }

        GeneralTools.CombineAndDeleteQueryResults(Environment.GetFolderPath(Environment.SpecialFolder.Desktop), query.Name);
        if (query.ResearchItemColumnNumber != 0)
        {
            CompileMissingItemsReport(Environment.GetFolderPath(Environment.SpecialFolder.Desktop), query.Name, queryNumber);
        }
    }

为什么你的破解版坏了

问题似乎有两个方面:

首先,您在外部作用域中有一个名为 finalQuery 的变量,您也在闭包中使用它,特别是作为 Parallel.For 的主体委托传入的变量,因此是Parallel.For.

的所有迭代中的相同变量

其次,你们都在同一个 Parallel.For 主体中读取和写入这个 finalQuery 变量,特别是代码:

finalQuery = GetAdaptedBaseQueryWithResearchItemsInserted(finalQuery, ...)

...您将在此处看到您将 finalQuery 的当前值作为基本查询传递。

该循环的各种迭代到达该代码行的顺序可能会发生变化,具体取决于系统架构和处理器负载,从而导致 竞争条件 。对变量的访问也不受 lock.

的约束

为什么其他版本有效

在您的工作版本中,finalQuery 是一个在内部声明的变量,因此完全是 Parallel.For 主体函数的局部变量。这可以防止任何 迭代 从其他迭代中看到 finalQuery 的值。更重要的是,每个 finalQuery 都是从一个通用的、不变的基本查询 (query.baseQuery) 构造的,代码如下:

var finalQuery = GetCorrectedQuery(query.BaseQuery, ...)

虽然您在下面的行中进一步调整了 finalQuery 的值:

finalQuery = GetCorrectedQueryWithResearchItems(finalQuery, ...)

...这很好,因为这个 finalQuery 变量是您的 lambda 函数的局部变量,它的值仅基于前一行,幸运的是, 不是 来自 Parallel.For 的其他迭代写入的不同值,就像您的竞争条件一样。