Parallel.Foreach 使用模分区

Parallel.Foreach with modulo partitioning

当使用 4 个线程对 100 个项目使用 Parallel.Foreach() 时,它会将列表分成 4 个项目块 (0-24,25-49,50-74,75-99),这意味着,并行处理项目 0、25、50 和 75。

是否有可能以模数方式对项目进行分区以首先处理索引较低的项目?喜欢:

Thread 1: 0, 5, 9,..
Thread 2: 1, 6, 10,...
Thread 3: 2, 7, 11,...
Thread 4: 3, 8, 12,...

这种分区方法称为 Round Robin 或 Striping。将此与 Parallel.ForEach() 一起使用的主要挑战是 ForEach() 需要分区程序支持动态分区,这对于这种类型的分区是不可能的,因为分区的数量必须在执行循环之前固定.

实现此类分区的一种方法是创建一个自定义的 class 派生自 System.Collections.Concurrent.Partitioner<TSource> 并使用 ParallelQuery.ForAll() 方法,该方法没有动态分区支持要求。对于大多数应用程序,这应该等同于使用 ForEach().

下面是自定义 Partitioner 和基本实现的示例。 Partitioner 将生成与并行度相同的分区数。

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;

namespace RoundRobinPartitioning
{
    public class RoundRobinPartitioner<TSource> : Partitioner<TSource>
    {
        private readonly IList<TSource> _source;

        public RoundRobinPartitioner(IList<TSource> source)
        {
            _source = source;
        }

        public override bool SupportsDynamicPartitions { get { return false; } }

        public override IList<IEnumerator<TSource>> GetPartitions(int partitionCount)
        {
            var enumerators = new List<IEnumerator<TSource>>(partitionCount);

            for (int i = 0; i < partitionCount; i++)
            {
                enumerators.Add(GetEnumerator(i, partitionCount));
            }

            return enumerators;
        }

        private IEnumerator<TSource> GetEnumerator(
            int partition,
            int partitionCount)
        {
            int position = partition;
            TSource value;

            while (position < _source.Count)
            {
                value = _source[position];
                position += partitionCount;
                yield return value;
            }
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            var values = Enumerable.Range(0, 100).ToList();

            var partitioner = new RoundRobinPartitioner<int>(values);

            partitioner.AsParallel()
                .WithDegreeOfParallelism(4)
                .ForAll(value =>
                {
                    // Perform work here
                });
        }
    }
}