如何防止 Parallel.ForEach 循环在运行时更改任务数?
How to prevent Parallel.ForEach loop from changing the number of tasks during runtime?
我正在使用 Parallel.ForEach
循环做一些工作,我用 localInit
初始化它,如下所示:
localInit: () => new
{
foo = new Foo(),
bars = CreateBars(),
}
根据 MSDN:
localInit, or the function that initializes the thread-local variable.
This function is called once for each partition in which the
Parallel.ForEach operation executes. Our example initializes
the thread-local variable to zero.
所以我尝试那样使用它,但我观察到循环不断终止并创建新任务,这导致频繁调用 localInit
。我的选择会适得其反,无法按预期工作。
我想当 Parallel.ForEach
会创建例如四个分区时,它会让它们保持活动状态,直到它遍历所有项目,但事实并非如此。它调用 localFinally
和 localInit
数次,用于包含几千个项目的集合。怎么会?
能否以某种方式阻止这种行为?我真的很希望节省一些资源,但它并没有让我。
这是循环的样子:
var parallelLoopResult = Parallel.ForEach
(
source: items,
parallelOptions: parallelOptions,
localInit: () => new
{
foo = new Foo(),
bars = CreateBars(),
},
body: (item, loopState, i, local) =>
{
parallelOptions.CancellationToken.ThrowIfCancellationRequested();
var results = local.bars.Select(x => ...).ToList().
....
return local;
},
localFinally: local =>
{
local.foo.Dispose();
lock (aggregateLock)
{
... process transformed bars
}
}
);
并行选项:
var parallelOptions = new ParallelOptions
{
CancellationToken = cancellationTokenSource.Token,
#if DEBUG
MaxDegreeOfParallelism = 1
//MaxDegreeOfParallelism = Environment.ProcessorCount
#else
MaxDegreeOfParallelism = Environment.ProcessorCount
#endif
};
This overload 不是唯一的,所以你可以试试这个:
var bars = CreateBars();
Parallel.Foreach(bars, b => { /* your action here */};
但是如果你真的想为每个线程创建一个 bars
的副本,你可以使用 LINQ 中的一些复制方法(假设你的 bars 是一个 IEnumerable<T>
变量):
var bars = CreateBars();
localInit: () => new
{
foo = new Foo(),
bars = new List<IBar>(bars),
}
每个 线程只创建一次 执行栏。但是你知道有多少并行执行吗?并行执行引擎可以自行决定是否启动任意数量的并行执行。
如果要限制并行执行,请使用MaxDegreeOfParallelism 属性。这将对一次创建的柱数设置上限。它仍然不会控制创建的总柱数,而且总柱数可能会少于您现在的预期。
如果您想拥有明确的控制权,请手动创建任务。
如果我对 the code 的理解正确,Parallel.ForEach()
每 Task
每隔几百毫秒重新启动一次。这意味着如果每次迭代都是大量的(通常应该如此),您将得到很多 Task
,因此会有很多对 localInit
和 localFinally
的调用。这样做的原因是对于同一进程中也使用相同 ThreadPool
.
的其他代码的公平性
我认为没有办法改变 Parallel.ForEach()
的这种行为。我认为解决这个问题的一个好方法是编写你自己的 Parallel.ForEach()
的简单版本。考虑到您可以利用 Partitioner<T>
并根据您需要的 Parallel.ForEach()
的功能,它可能相对简单。例如,类似于:
public static void MyParallelForEach<TSource, TLocal>(
IEnumerable<TSource> source, int degreeOfParallelism,
Func<TLocal> localInit, Func<TSource, TLocal, TLocal> body, Action<TLocal> localFinally)
{
var partitionerSource = Partitioner.Create(source).GetDynamicPartitions();
Action taskAction = () =>
{
var localState = localInit();
foreach (var item in partitionerSource)
{
localState = body(item, localState);
}
localFinally(localState);
};
var tasks = new Task[degreeOfParallelism - 1];
for (int i = 0; i < degreeOfParallelism - 1; i++)
{
tasks[i] = Task.Run(taskAction);
}
taskAction();
Task.WaitAll(tasks);
}
我正在使用 Parallel.ForEach
循环做一些工作,我用 localInit
初始化它,如下所示:
localInit: () => new
{
foo = new Foo(),
bars = CreateBars(),
}
根据 MSDN:
localInit, or the function that initializes the thread-local variable. This function is called once for each partition in which the Parallel.ForEach operation executes. Our example initializes the thread-local variable to zero.
所以我尝试那样使用它,但我观察到循环不断终止并创建新任务,这导致频繁调用 localInit
。我的选择会适得其反,无法按预期工作。
我想当 Parallel.ForEach
会创建例如四个分区时,它会让它们保持活动状态,直到它遍历所有项目,但事实并非如此。它调用 localFinally
和 localInit
数次,用于包含几千个项目的集合。怎么会?
能否以某种方式阻止这种行为?我真的很希望节省一些资源,但它并没有让我。
这是循环的样子:
var parallelLoopResult = Parallel.ForEach
(
source: items,
parallelOptions: parallelOptions,
localInit: () => new
{
foo = new Foo(),
bars = CreateBars(),
},
body: (item, loopState, i, local) =>
{
parallelOptions.CancellationToken.ThrowIfCancellationRequested();
var results = local.bars.Select(x => ...).ToList().
....
return local;
},
localFinally: local =>
{
local.foo.Dispose();
lock (aggregateLock)
{
... process transformed bars
}
}
);
并行选项:
var parallelOptions = new ParallelOptions
{
CancellationToken = cancellationTokenSource.Token,
#if DEBUG
MaxDegreeOfParallelism = 1
//MaxDegreeOfParallelism = Environment.ProcessorCount
#else
MaxDegreeOfParallelism = Environment.ProcessorCount
#endif
};
This overload 不是唯一的,所以你可以试试这个:
var bars = CreateBars();
Parallel.Foreach(bars, b => { /* your action here */};
但是如果你真的想为每个线程创建一个 bars
的副本,你可以使用 LINQ 中的一些复制方法(假设你的 bars 是一个 IEnumerable<T>
变量):
var bars = CreateBars();
localInit: () => new
{
foo = new Foo(),
bars = new List<IBar>(bars),
}
每个 线程只创建一次 执行栏。但是你知道有多少并行执行吗?并行执行引擎可以自行决定是否启动任意数量的并行执行。
如果要限制并行执行,请使用MaxDegreeOfParallelism 属性。这将对一次创建的柱数设置上限。它仍然不会控制创建的总柱数,而且总柱数可能会少于您现在的预期。
如果您想拥有明确的控制权,请手动创建任务。
如果我对 the code 的理解正确,Parallel.ForEach()
每 Task
每隔几百毫秒重新启动一次。这意味着如果每次迭代都是大量的(通常应该如此),您将得到很多 Task
,因此会有很多对 localInit
和 localFinally
的调用。这样做的原因是对于同一进程中也使用相同 ThreadPool
.
我认为没有办法改变 Parallel.ForEach()
的这种行为。我认为解决这个问题的一个好方法是编写你自己的 Parallel.ForEach()
的简单版本。考虑到您可以利用 Partitioner<T>
并根据您需要的 Parallel.ForEach()
的功能,它可能相对简单。例如,类似于:
public static void MyParallelForEach<TSource, TLocal>(
IEnumerable<TSource> source, int degreeOfParallelism,
Func<TLocal> localInit, Func<TSource, TLocal, TLocal> body, Action<TLocal> localFinally)
{
var partitionerSource = Partitioner.Create(source).GetDynamicPartitions();
Action taskAction = () =>
{
var localState = localInit();
foreach (var item in partitionerSource)
{
localState = body(item, localState);
}
localFinally(localState);
};
var tasks = new Task[degreeOfParallelism - 1];
for (int i = 0; i < degreeOfParallelism - 1; i++)
{
tasks[i] = Task.Run(taskAction);
}
taskAction();
Task.WaitAll(tasks);
}