C# Parallel.For 和非初始化数组
C# Parallel.For and non-initialized arrays
场景是这样的:在一个Parallel.For中一个数组被用在一个非并行的for中。数组的所有元素都被覆盖,因此在技术上没有必要分配和初始化它(据我从 C# 教程中推断,这总是在构造时发生):
float[] result = new float[16384];
System.Threading.Tasks.Parallel.For(0,16384,x =>
{
int[] histogram = new int[32768]; // allocation and initialization with all 0's, no?
for (int i = 0; i < histogram.Length; i++)
{
histogram[i] = some_func(); // each element in histogram[] is written anew
}
result[x] = do_something_with(histogram);
});
顺序代码中的解决方案很简单:将数组拉到外部 for 循环的前面:
float[] result = new float[16384];
int[] histogram = new int[32768]; // allocation and initialization with
for(x = 0; x < 16384; x++)
{
for (int i = 0; i < histogram.Length; i++)
{
histogram[i] = some_func();
}
restult[x] = do_something_with(histogram);
}
现在在外循环中既没有发生分配也没有徒劳的 0-ing。
不过在并行版本中,这肯定是一个糟糕的举动,要么并行进程正在破坏彼此的直方图结果,要么 C# 足够聪明地锁定 histogram
从而关闭任何并行性。分配 histogram[16384,32768]
同样是浪费。我现在正在尝试的是
public static ParallelLoopResult For<TLocal>(
int fromInclusive,
int toExclusive,
Func<TLocal> localInit,
Func<int, ParallelLoopState, TLocal, TLocal> body,
Action<TLocal> localFinally
)
库构造(函数?),但由于这是我第一次尝试使用 C# 进行并行编程,所以我充满了疑问。以下是顺序案例的正确翻译吗?
float[] result = new float[16384];
System.Threading.Tasks.Parallel.For<short[]>(0, 16384,
() => new short[32768],
(x, loopState, histogram) =>
{
for (int i = 0; i < histogram.Length; i++)
{
histogram[i] = some_func();
}
result[x] = do_something_with(histogram);
return histogram;
}, (histogram) => { });
你走在正确的轨道上。
循环的 16K 次迭代将由少量线程处理。如果您在循环外捕获一个局部变量,那么每次迭代都会共享同一个对象。如果您声明一个本地对象并在循环内分配一个对象,那么每个对象将有 16K 个分配。
有一个可以分配对象的中间位置,称为 Thread Local Variables。
它主要用于您正在累积全局结果但希望最小化同步开销的场景。因此,运行循环迭代的每个线程都会获得自己的状态副本,然后在最后可以聚合它们。
但是使用它在多个循环迭代中重用缓冲区也很好。
我不太确定你的要求,但让我们看看起点:
public void Original()
{
float[] result = new float[16384];
System.Threading.Tasks.Parallel.For(0, 16384, x =>
{
int[] histogram = new int[32768]; // allocation and initialization with all 0's, no?
for (int i = 0; i < histogram.Length; i++)
{
histogram[i] = some_func(); // each element in histogram[] is written anew
}
result[x] = do_something_with(histogram);
});
}
内部循环生成一个 histogram
,而外部循环使用一个 histogram
并使用它在 Results
.
中生成单个值
一种易于操作的解决方案是执行此处理TPL-Dataflow,这是 TPL 之上的抽象。要进行设置,我们需要一些 DTO 来通过数据流管道。
public class HistogramWithIndex
{
public HistogramWithIndex(IEnumerable<int> histogram, int index)
{
Histogram = histogram;
Index = index;
}
public IEnumerable<int> Histogram { get; }
public int Index { get; }
}
public class IndexWithHistogramSize
{
public IndexWithHistogramSize(int index, int histogramSize)
{
Index = index;
HistogramSize = histogramSize;
}
public int Index { get; }
public int HistogramSize { get; }
}
这些 类 代表您在不同处理阶段的数据。现在让我们看看管道。
public async Task Dataflow()
{
//Build our pipeline
var options = new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = Environment.ProcessorCount,
//This is default but I want to point it out
EnsureOrdered = true
};
var buildHistorgramBlock = new TransformBlock<IndexWithHistogramSize, HistogramWithIndex>(inputData =>
{
var histogram = Enumerable.Range(0, inputData.HistogramSize).Select(_ => some_func());
return new HistogramWithIndex(histogram, inputData.Index);
}, options);
var doSomethingBlock = new TransformBlock<HistogramWithIndex, int>(x => do_something_with(x.Histogram.ToArray()), options);
var resultBlock1 = new ActionBlock<int>(x => Results1.Add(x), options);
//var resultBlock2 = new ActionBlock<int>(x => //insert into list with index, options);
//link the blocks
buildHistorgramBlock.LinkTo(doSomethingBlock, new DataflowLinkOptions() { PropagateCompletion = true });
doSomethingBlock.LinkTo(resultBlock1, new DataflowLinkOptions() { PropagateCompletion = true });
//Post data
var histogramSize = 32768;
foreach (var index in Enumerable.Range(0, 16384))
{
await buildHistorgramBlock.SendAsync(new IndexWithHistogramSize(index, histogramSize));
}
buildHistorgramBlock.Complete();
await resultBlock1.Completion;
}
由两个 TransformBLocks
和 ActionBlock
组成的块形成了一个链接的管道。这里的好处是,改变并行度、引入背压的每个块的有界容量等等变得非常容易。
重要说明:TransformBlocks
,如果使用并行性,即 MDOP >1,那么他们将按照收到的顺序输出他们的项目。这意味着如果他们按顺序进来,他们就会按顺序离开。您还可以使用块选项 Ensure Ordering
关闭排序。如果您希望您的项目在特定索引 without/with 特定排序中,这就会发挥作用。
这可能看起来有点矫枉过正,可能适合您的项目。但我发现这非常灵活且易于维护。尤其是当您开始向处理链中添加步骤时,添加一个块比将另一个 for 循环环绕在所有内容上要干净得多。
这是 c&p 的其余样板代码
private ConcurrentBag<int> Results1 = new ConcurrentBag<int>();
private int some_func() => 1;
private int do_something_with(int[] i) => i.First();
您在任务和线程之间共享的越少,更容易并行化您的代码。
如果您想减少分配,您可以重复使用缓冲区。这不仅会减少分配,还会减少昂贵的 垃圾收集 。但是,不要 将它们存储在线程本地状态中,否则您将不得不自己处理分配、重新分配和清除它们。
Parallel.ForEach
使用 tasks,而不是线程,这意味着服务于一个任务的线程最终可能服务于完全不相关的东西,但保留了一个没人需要的缓冲区。此外,Parallel.Foreach
可以配置为 回收 任务,以......清理累积的状态。在这种情况下,您必须从线程状态中清除缓冲区并重新初始化它们。
将 memory/buffer 池用于 "rent" 现成的缓冲区并在完成后 "release" 使用它们更容易,性能更高。
过去,您可以使用 BufferManager class 提供现成的 byte[]
数组,并被 WCF 用来重用缓冲区。
如今更好的选择是使用 System.Buffers 包中的 ArrayPool,它可以 return 任何类型的数组:
var pool = ArrayPool<int>.Shared;
var bufferLength = 32768;
var result = new float[16384];
Parallel.For(0,16384,x =>
{
try
{
var histogram = pool.Rent(bufferLength);
for (int i = 0; i < bufferLength; i++)
{
histogram[i] = some_func();
}
result[x] = do_something_with(histogram);
}
finally
{
//Ensure the buffer is returned even in case of error
pool.Return(histogram);
}
});
您指定的尺寸是最小尺寸。如果没有匹配请求的可用缓冲区,池可以 return 更大,这就是为什么你必须在循环中使用 bufferLength
。
在大多数情况下使用共享池应该没问题。它的默认最大数组大小为 1MB (1 048 576),每个大小桶有 50 个数组。 ArrayPool 将相同大小的缓冲区组织在桶中以便更快地访问。
如果需要,您可以创建另一个具有特定最大缓冲区大小的池。创建数组时可以指定每个桶中最多可以放置多少个数组,eg:
var maxLength = 32768;
var pool = ArrayPool<int>.Create(maxLength,1024);
var bufferLength = 32768;
ArrayPool
的代码并不复杂。你可以看看here
场景是这样的:在一个Parallel.For中一个数组被用在一个非并行的for中。数组的所有元素都被覆盖,因此在技术上没有必要分配和初始化它(据我从 C# 教程中推断,这总是在构造时发生):
float[] result = new float[16384];
System.Threading.Tasks.Parallel.For(0,16384,x =>
{
int[] histogram = new int[32768]; // allocation and initialization with all 0's, no?
for (int i = 0; i < histogram.Length; i++)
{
histogram[i] = some_func(); // each element in histogram[] is written anew
}
result[x] = do_something_with(histogram);
});
顺序代码中的解决方案很简单:将数组拉到外部 for 循环的前面:
float[] result = new float[16384];
int[] histogram = new int[32768]; // allocation and initialization with
for(x = 0; x < 16384; x++)
{
for (int i = 0; i < histogram.Length; i++)
{
histogram[i] = some_func();
}
restult[x] = do_something_with(histogram);
}
现在在外循环中既没有发生分配也没有徒劳的 0-ing。
不过在并行版本中,这肯定是一个糟糕的举动,要么并行进程正在破坏彼此的直方图结果,要么 C# 足够聪明地锁定 histogram
从而关闭任何并行性。分配 histogram[16384,32768]
同样是浪费。我现在正在尝试的是
public static ParallelLoopResult For<TLocal>(
int fromInclusive,
int toExclusive,
Func<TLocal> localInit,
Func<int, ParallelLoopState, TLocal, TLocal> body,
Action<TLocal> localFinally
)
库构造(函数?),但由于这是我第一次尝试使用 C# 进行并行编程,所以我充满了疑问。以下是顺序案例的正确翻译吗?
float[] result = new float[16384];
System.Threading.Tasks.Parallel.For<short[]>(0, 16384,
() => new short[32768],
(x, loopState, histogram) =>
{
for (int i = 0; i < histogram.Length; i++)
{
histogram[i] = some_func();
}
result[x] = do_something_with(histogram);
return histogram;
}, (histogram) => { });
你走在正确的轨道上。
循环的 16K 次迭代将由少量线程处理。如果您在循环外捕获一个局部变量,那么每次迭代都会共享同一个对象。如果您声明一个本地对象并在循环内分配一个对象,那么每个对象将有 16K 个分配。
有一个可以分配对象的中间位置,称为 Thread Local Variables。
它主要用于您正在累积全局结果但希望最小化同步开销的场景。因此,运行循环迭代的每个线程都会获得自己的状态副本,然后在最后可以聚合它们。
但是使用它在多个循环迭代中重用缓冲区也很好。
我不太确定你的要求,但让我们看看起点:
public void Original()
{
float[] result = new float[16384];
System.Threading.Tasks.Parallel.For(0, 16384, x =>
{
int[] histogram = new int[32768]; // allocation and initialization with all 0's, no?
for (int i = 0; i < histogram.Length; i++)
{
histogram[i] = some_func(); // each element in histogram[] is written anew
}
result[x] = do_something_with(histogram);
});
}
内部循环生成一个 histogram
,而外部循环使用一个 histogram
并使用它在 Results
.
一种易于操作的解决方案是执行此处理TPL-Dataflow,这是 TPL 之上的抽象。要进行设置,我们需要一些 DTO 来通过数据流管道。
public class HistogramWithIndex
{
public HistogramWithIndex(IEnumerable<int> histogram, int index)
{
Histogram = histogram;
Index = index;
}
public IEnumerable<int> Histogram { get; }
public int Index { get; }
}
public class IndexWithHistogramSize
{
public IndexWithHistogramSize(int index, int histogramSize)
{
Index = index;
HistogramSize = histogramSize;
}
public int Index { get; }
public int HistogramSize { get; }
}
这些 类 代表您在不同处理阶段的数据。现在让我们看看管道。
public async Task Dataflow()
{
//Build our pipeline
var options = new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = Environment.ProcessorCount,
//This is default but I want to point it out
EnsureOrdered = true
};
var buildHistorgramBlock = new TransformBlock<IndexWithHistogramSize, HistogramWithIndex>(inputData =>
{
var histogram = Enumerable.Range(0, inputData.HistogramSize).Select(_ => some_func());
return new HistogramWithIndex(histogram, inputData.Index);
}, options);
var doSomethingBlock = new TransformBlock<HistogramWithIndex, int>(x => do_something_with(x.Histogram.ToArray()), options);
var resultBlock1 = new ActionBlock<int>(x => Results1.Add(x), options);
//var resultBlock2 = new ActionBlock<int>(x => //insert into list with index, options);
//link the blocks
buildHistorgramBlock.LinkTo(doSomethingBlock, new DataflowLinkOptions() { PropagateCompletion = true });
doSomethingBlock.LinkTo(resultBlock1, new DataflowLinkOptions() { PropagateCompletion = true });
//Post data
var histogramSize = 32768;
foreach (var index in Enumerable.Range(0, 16384))
{
await buildHistorgramBlock.SendAsync(new IndexWithHistogramSize(index, histogramSize));
}
buildHistorgramBlock.Complete();
await resultBlock1.Completion;
}
由两个 TransformBLocks
和 ActionBlock
组成的块形成了一个链接的管道。这里的好处是,改变并行度、引入背压的每个块的有界容量等等变得非常容易。
重要说明:TransformBlocks
,如果使用并行性,即 MDOP >1,那么他们将按照收到的顺序输出他们的项目。这意味着如果他们按顺序进来,他们就会按顺序离开。您还可以使用块选项 Ensure Ordering
关闭排序。如果您希望您的项目在特定索引 without/with 特定排序中,这就会发挥作用。
这可能看起来有点矫枉过正,可能适合您的项目。但我发现这非常灵活且易于维护。尤其是当您开始向处理链中添加步骤时,添加一个块比将另一个 for 循环环绕在所有内容上要干净得多。
这是 c&p 的其余样板代码
private ConcurrentBag<int> Results1 = new ConcurrentBag<int>();
private int some_func() => 1;
private int do_something_with(int[] i) => i.First();
您在任务和线程之间共享的越少,更容易并行化您的代码。
如果您想减少分配,您可以重复使用缓冲区。这不仅会减少分配,还会减少昂贵的 垃圾收集 。但是,不要 将它们存储在线程本地状态中,否则您将不得不自己处理分配、重新分配和清除它们。
Parallel.ForEach
使用 tasks,而不是线程,这意味着服务于一个任务的线程最终可能服务于完全不相关的东西,但保留了一个没人需要的缓冲区。此外,Parallel.Foreach
可以配置为 回收 任务,以......清理累积的状态。在这种情况下,您必须从线程状态中清除缓冲区并重新初始化它们。
将 memory/buffer 池用于 "rent" 现成的缓冲区并在完成后 "release" 使用它们更容易,性能更高。
过去,您可以使用 BufferManager class 提供现成的 byte[]
数组,并被 WCF 用来重用缓冲区。
如今更好的选择是使用 System.Buffers 包中的 ArrayPool,它可以 return 任何类型的数组:
var pool = ArrayPool<int>.Shared;
var bufferLength = 32768;
var result = new float[16384];
Parallel.For(0,16384,x =>
{
try
{
var histogram = pool.Rent(bufferLength);
for (int i = 0; i < bufferLength; i++)
{
histogram[i] = some_func();
}
result[x] = do_something_with(histogram);
}
finally
{
//Ensure the buffer is returned even in case of error
pool.Return(histogram);
}
});
您指定的尺寸是最小尺寸。如果没有匹配请求的可用缓冲区,池可以 return 更大,这就是为什么你必须在循环中使用 bufferLength
。
在大多数情况下使用共享池应该没问题。它的默认最大数组大小为 1MB (1 048 576),每个大小桶有 50 个数组。 ArrayPool 将相同大小的缓冲区组织在桶中以便更快地访问。
如果需要,您可以创建另一个具有特定最大缓冲区大小的池。创建数组时可以指定每个桶中最多可以放置多少个数组,eg:
var maxLength = 32768;
var pool = ArrayPool<int>.Create(maxLength,1024);
var bufferLength = 32768;
ArrayPool
的代码并不复杂。你可以看看here