Parallel.ForEach 丢失数据
Parallel.ForEach losing data
Parallel.ForEach 有助于提高性能,但是,我发现数据丢失了。
已尝试 - 变量结果、处理数据为 ConcurrentBag<IwrRows>
1)
Parallel.ForEach(results, () => new ConcurrentBag<IwrRows>(), (n, loopState, localData) =>
{
return ProcessData(n); // ProcessData complicated business logic
}, (localData) => AddRows(localData, processedData, obj)
);
2)
await Task.Run(() => Parallel.ForEach(results, item =>
{
ProcessData(item, processedData);
}));
3)
Parallel.ForEach(results, item =>
{
ProcessData(item, processedData);
});
他们都丢失了一些行。
当我使用 foreach 块时,它 returns 始终是相同的值,但是,它慢了 4 倍。
foreach (var item in results)
{
// ProcessData returns a List<IwrRows>
processedData.AddRange(ProcessData(item));
}
不确定我在这里遗漏了什么。
结果 - 51112
Foreach returns 返回 41316 行。
ForeachParallel returns 41308 或 41313 或 41314 各不相同 运行
您的问题似乎出在:AddRows(localData, processedData, obj)。此方法可能将数据添加到非线程安全的列表中。您应该将 添加到线程安全列表或围绕添加数据进行一些同步。
您似乎很难处理结果并将它们重新整理成一个连贯的列表。您可以使用 PLinQ,因此您不必担心结果容器是线程安全的:
var processedData = yourData.AsParallel().Select(ProcessData).ToList();
我认为在 2) 中使用 await Task.Run
没用。
如果 Foreach returns 41316 rows back
对于 Results - 51112
问题不在 Parallel.ForEach
中,而是在您的添加/处理机制中。请记住,即使 ConcurrentBag
保证其上的每个操作都是线程安全的,它也不会处理重复项。
嗯,您的业务逻辑 (ProcessData) 肯定有问题。
也许不是 pararell.foreach,但我认为这个可能会加快您的代码速度,它也是异步的,使用 LINQ。
就是这样,我正在处理一些数据的并行异步操作。
您可能需要展平 taskList 的结果(它的完整伪代码是从头编写的)。您始终可以始终使用 yield return 来稍后具体化您的列表,这可能会更加固定它。但谨慎使用 yield :)
var taskList = results.Select(async item =>
{
return await ProcessData(item, processedData);
});
await Task.WhenAll(taskList);
根据需要使用 WhenAll 或 WaitAll
Task.WaitAll:
At least one of the Task instances was canceled -or- an exception was thrown during the execution of
at least one of the Task instances.If a task was canceled, the AggregateException contains an
OperationCanceledException in its InnerExceptions collection.
Task.WhenAll:
If any of the supplied tasks completes in a faulted state, the returned task will also complete in a
Faulted state, where its exceptions will contain the aggregation of the set of unwrapped exceptions
from each of the supplied tasks.
Parallel.ForEach 有助于提高性能,但是,我发现数据丢失了。
已尝试 - 变量结果、处理数据为 ConcurrentBag<IwrRows>
1)
Parallel.ForEach(results, () => new ConcurrentBag<IwrRows>(), (n, loopState, localData) =>
{
return ProcessData(n); // ProcessData complicated business logic
}, (localData) => AddRows(localData, processedData, obj)
);
2)
await Task.Run(() => Parallel.ForEach(results, item =>
{
ProcessData(item, processedData);
}));
3)
Parallel.ForEach(results, item =>
{
ProcessData(item, processedData);
});
他们都丢失了一些行。
当我使用 foreach 块时,它 returns 始终是相同的值,但是,它慢了 4 倍。
foreach (var item in results)
{
// ProcessData returns a List<IwrRows>
processedData.AddRange(ProcessData(item));
}
不确定我在这里遗漏了什么。
结果 - 51112 Foreach returns 返回 41316 行。 ForeachParallel returns 41308 或 41313 或 41314 各不相同 运行
您的问题似乎出在:AddRows(localData, processedData, obj)。此方法可能将数据添加到非线程安全的列表中。您应该将 添加到线程安全列表或围绕添加数据进行一些同步。
您似乎很难处理结果并将它们重新整理成一个连贯的列表。您可以使用 PLinQ,因此您不必担心结果容器是线程安全的:
var processedData = yourData.AsParallel().Select(ProcessData).ToList();
我认为在 2) 中使用 await Task.Run
没用。
如果 Foreach returns 41316 rows back
对于 Results - 51112
问题不在 Parallel.ForEach
中,而是在您的添加/处理机制中。请记住,即使 ConcurrentBag
保证其上的每个操作都是线程安全的,它也不会处理重复项。
嗯,您的业务逻辑 (ProcessData) 肯定有问题。
也许不是 pararell.foreach,但我认为这个可能会加快您的代码速度,它也是异步的,使用 LINQ。
就是这样,我正在处理一些数据的并行异步操作。
您可能需要展平 taskList 的结果(它的完整伪代码是从头编写的)。您始终可以始终使用 yield return 来稍后具体化您的列表,这可能会更加固定它。但谨慎使用 yield :)
var taskList = results.Select(async item =>
{
return await ProcessData(item, processedData);
});
await Task.WhenAll(taskList);
根据需要使用 WhenAll 或 WaitAll
Task.WaitAll:
At least one of the Task instances was canceled -or- an exception was thrown during the execution of
at least one of the Task instances.If a task was canceled, the AggregateException contains an
OperationCanceledException in its InnerExceptions collection.
Task.WhenAll:
If any of the supplied tasks completes in a faulted state, the returned task will also complete in a
Faulted state, where its exceptions will contain the aggregation of the set of unwrapped exceptions
from each of the supplied tasks.