C# Parallel.For 和非初始化数组

C# Parallel.For and non-initialized arrays

场景是这样的:在一个Parallel.For中一个数组被用在一个非并行的for中。数组的所有元素都被覆盖,因此在技术上没有必要分配和初始化它(据我从 C# 教程中推断,这总是在构造时发生):

float[] result = new float[16384];
System.Threading.Tasks.Parallel.For(0,16384,x =>
{
   int[] histogram = new int[32768]; // allocation and initialization with all 0's, no?
   for (int i = 0; i < histogram.Length; i++)
   {
      histogram[i] = some_func(); // each element in histogram[] is written anew
   }
   result[x] = do_something_with(histogram);
});

顺序代码中的解决方案很简单:将数组拉到外部 for 循环的前面:

float[] result = new float[16384];
int[] histogram = new int[32768]; // allocation and initialization with      
for(x = 0; x < 16384; x++)
{
   for (int i = 0; i < histogram.Length; i++)
   {
      histogram[i] = some_func(); 
   }
   restult[x] = do_something_with(histogram);
}

现在在外循环中既没有发生分配也没有徒劳的 0-ing。 不过在并行版本中,这肯定是一个糟糕的举动,要么并行进程正在破坏彼此的直方图结果,要么 C# 足够聪明地锁定 histogram 从而关闭任何并行性。分配 histogram[16384,32768] 同样是浪费。我现在正在尝试的是

public static ParallelLoopResult For<TLocal>(
    int fromInclusive,
    int toExclusive,
    Func<TLocal> localInit,
    Func<int, ParallelLoopState, TLocal, TLocal> body,
    Action<TLocal> localFinally
)

库构造(函数?),但由于这是我第一次尝试使用 C# 进行并行编程,所以我充满了疑问。以下是顺序案例的正确翻译吗?

float[] result = new float[16384];
System.Threading.Tasks.Parallel.For<short[]>(0, 16384, 
                                             () => new short[32768], 
                                             (x, loopState, histogram) =>
{
    for (int i = 0; i < histogram.Length; i++)
    {
       histogram[i] = some_func(); 
    }
    result[x] = do_something_with(histogram);
    return histogram;
}, (histogram) => { });

你走在正确的轨道上。

循环的 16K 次迭代将由少量线程处理。如果您在循环外捕获一个局部变量,那么每次迭代都会共享同一个对象。如果您声明一个本地对象并在循环内分配一个对象,那么每个对象将有 16K 个分配。

有一个可以分配对象的中间位置,称为 Thread Local Variables

它主要用于您正在累积全局结果但希望最小化同步开销的场景。因此,运行循环迭代的每个线程都会获得自己的状态副本,然后在最后可以聚合它们。

但是使用它在多个循环迭代中重用缓冲区也很好。

我不太确定你的要求,但让我们看看起点:

public void Original()
{
    float[] result = new float[16384];
    System.Threading.Tasks.Parallel.For(0, 16384, x =>
    {
        int[] histogram = new int[32768]; // allocation and initialization with all 0's, no?
        for (int i = 0; i < histogram.Length; i++)
        {
            histogram[i] = some_func(); // each element in histogram[] is written anew
        }
        result[x] = do_something_with(histogram);
    });
}

内部循环生成一个 histogram,而外部循环使用一个 histogram 并使用它在 Results.

中生成单个值

一种易于操作的解决方案是执行此处理TPL-Dataflow,这是 TPL 之上的抽象。要进行设置,我们需要一些 DTO 来通过数据流管道。

public class HistogramWithIndex
{
    public HistogramWithIndex(IEnumerable<int> histogram, int index)
    {
        Histogram = histogram;
        Index = index;
    }
    public IEnumerable<int> Histogram { get; }
    public int Index { get; }
}

public class IndexWithHistogramSize
{
    public IndexWithHistogramSize(int index, int histogramSize)
    {
        Index = index;
        HistogramSize = histogramSize;
    }
    public int Index { get; }
    public int HistogramSize { get; }
}

这些 类 代表您在不同处理阶段的数据。现在让我们看看管道。

public async Task Dataflow()
{
    //Build our pipeline
    var options = new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = Environment.ProcessorCount,
        //This is default but I want to point it out
        EnsureOrdered = true
    };
    var buildHistorgramBlock = new TransformBlock<IndexWithHistogramSize, HistogramWithIndex>(inputData =>
    {
        var histogram = Enumerable.Range(0, inputData.HistogramSize).Select(_ => some_func());
        return new HistogramWithIndex(histogram, inputData.Index);
    }, options);
    var doSomethingBlock = new TransformBlock<HistogramWithIndex, int>(x => do_something_with(x.Histogram.ToArray()), options);

    var resultBlock1 = new ActionBlock<int>(x => Results1.Add(x), options);
    //var resultBlock2 = new ActionBlock<int>(x => //insert into list with index, options);

    //link the blocks
    buildHistorgramBlock.LinkTo(doSomethingBlock, new DataflowLinkOptions() { PropagateCompletion = true });
    doSomethingBlock.LinkTo(resultBlock1, new DataflowLinkOptions() { PropagateCompletion = true });

    //Post data
    var histogramSize = 32768;
    foreach (var index in Enumerable.Range(0, 16384))
    {
        await buildHistorgramBlock.SendAsync(new IndexWithHistogramSize(index, histogramSize));
    }

    buildHistorgramBlock.Complete();
    await resultBlock1.Completion;
} 

由两个 TransformBLocksActionBlock 组成的块形成了一个链接的管道。这里的好处是,改变并行度、引入背压的每个块的有界容量等等变得非常容易。

重要说明:TransformBlocks,如果使用并行性,即 MDOP >1,那么他们将按照收到的顺序输出他们的项目。这意味着如果他们按顺序进来,他们就会按顺序离开。您还可以使用块选项 Ensure Ordering 关闭排序。如果您希望您的项目在特定索引 without/with 特定排序中,这就会发挥作用。

这可能看起来有点矫枉过正,可能适合您的项目。但我发现这非常灵活且易于维护。尤其是当您开始向处理链中添加步骤时,添加一个块比将另一个 for 循环环绕在所有内容上要干净得多。

这是 c&p 的其余样板代码

private ConcurrentBag<int> Results1 = new ConcurrentBag<int>();
private int some_func() => 1;
private int do_something_with(int[] i) => i.First();

您在任务和线程之间共享的越少,更容易并行化您的代码。

如果您想减少分配,您可以重复使用缓冲区。这不仅会减少分配,还会减少昂贵的 垃圾收集 。但是,不要 将它们存储在线程本地状态中,否则您将不得不自己处理分配、重新分配和清除它们。

Parallel.ForEach 使用 tasks,而不是线程,这意味着服务于一个任务的线程最终可能服务于完全不相关的东西,但保留了一个没人需要的缓冲区。此外,Parallel.Foreach 可以配置为 回收 任务,以......清理累积的状态。在这种情况下,您必须从线程状态中清除缓冲区并重新初始化它们。

将 memory/buffer 池用于 "rent" 现成的缓冲区并在完成后 "release" 使用它们更容易,性能更高。

过去,您可以使用 BufferManager class 提供现成的 byte[] 数组,并被 WCF 用来重用缓冲区。

如今更好的选择是使用 System.Buffers 包中的 ArrayPool,它可以 return 任何类型的数组:

var pool = ArrayPool<int>.Shared;
var bufferLength = 32768;

var result = new float[16384];
Parallel.For(0,16384,x =>
{
   try
   {
       var histogram = pool.Rent(bufferLength);
       for (int i = 0; i < bufferLength; i++)
       {
          histogram[i] = some_func(); 
       }
       result[x] = do_something_with(histogram);
    }
    finally
    {
       //Ensure the buffer is returned even in case of error
       pool.Return(histogram);
    }
});

您指定的尺寸是最小尺寸。如果没有匹配请求的可用缓冲区,池可以 return 更大,这就是为什么你必须在循环中使用 bufferLength

在大多数情况下使用共享池应该没问题。它的默认最大数组大小为 1MB (1 048 576),每个大小桶有 50 个数组。 ArrayPool 将相同大小的缓冲区组织在桶中以便更快地访问。

如果需要,您可以创建另一个具有特定最大缓冲区大小的池。创建数组时可以指定每个桶中最多可以放置多少个数组,eg:

var maxLength = 32768;
var pool = ArrayPool<int>.Create(maxLength,1024);
var bufferLength = 32768;

ArrayPool的代码并不复杂。你可以看看here