C# 如何划分并行 foreach 循环以迭代我的列表

C# How to partition parallel foreach loop to iterate my list

我是编程界的新手。我正在毕业,也在学习 dotnet。

我想为每个并行迭代我的列表,但我想在那里使用分区。我缺乏知识,所以我的代码无法编译。

实际上我是先这样做的,效果很好。

Parallel.ForEach(MyBroker, broker =>,,
{

    mybrow = new WeightageRowNumber();
    mybrow.RowNumber = Interlocked.Increment(ref rowNumber);

    lock (_lock)
    {
      Mylist.Add(mybrow);
    }
});

现在我想使用分区,所以我用这种方式更改了我的代码,但现在我的代码无法编译。这是代码

Parallel.ForEach(MyBroker, broker,
  (j, loop, subtotal) =>
{
    mybrow = new WeightageRowNumber();
    mybrow.RowNumber = Interlocked.Increment(ref rowNumber);

    lock (_lock)
    {
       Mylist.Add(mybrow);
    }
    return brokerRowWeightageRowNumber.RowNumber;

},

(finalResult) =>
    var rownum= Interlocked.Increment(ref finalResult); 
    console.writeline(rownum);
);

请查看我的第二组代码并向我展示如何重组以使用并行 foreach 的分区来迭代我的列表。

请指导我。谢谢

Parallel.ForEach 方法有 20 个重载 - 也许尝试不同的重载?

如果不包括您的依赖项,我无法就您的实施给出一对一的示例,但这里有一个深入的示例(根据 here 重新格式化),您可以将其复制到您的 [=24] =] 并设置调试断点(如果有用的话)。不幸的是,构建 OrderablePartitioner 的可实例化重载似乎并不简单,因此对所有样板代码感到抱歉:

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading;
using System.Collections.Concurrent;
using System.Collections;
using System.Linq;

// Simple partitioner that will extract one (index,item) pair at a time, 
// in a thread-safe fashion, from the underlying collection.
class SingleElementOrderablePartitioner<T> : OrderablePartitioner<T>
{
    // The collection being wrapped by this Partitioner
    IEnumerable<T> m_referenceEnumerable;

    // Class used to wrap m_index for the purpose of sharing access to it
    // between an InternalEnumerable and multiple InternalEnumerators
    private class Shared<U>
    {
        internal U Value;
        public Shared(U item)
        {
            Value = item;
        }
    }

    // Internal class that serves as a shared enumerable for the
    // underlying collection.
    private class InternalEnumerable : IEnumerable<KeyValuePair<long, T>>, IDisposable
    {
        IEnumerator<T> m_reader;
        bool m_disposed = false;
        Shared<long> m_index = null;

        // These two are used to implement Dispose() when static partitioning is being performed
        int m_activeEnumerators;
        bool m_downcountEnumerators;

        // "downcountEnumerators" will be true for static partitioning, false for
        // dynamic partitioning.  
        public InternalEnumerable(IEnumerator<T> reader, bool downcountEnumerators)
        {
            m_reader = reader;
            m_index = new Shared<long>(0);
            m_activeEnumerators = 0;
            m_downcountEnumerators = downcountEnumerators;
        }

        public IEnumerator<KeyValuePair<long, T>> GetEnumerator()
        {
            if (m_disposed)
                throw new ObjectDisposedException("InternalEnumerable: Can't call GetEnumerator() after disposing");

            // For static partitioning, keep track of the number of active enumerators.
            if (m_downcountEnumerators) Interlocked.Increment(ref m_activeEnumerators);

            return new InternalEnumerator(m_reader, this, m_index);
        }

        IEnumerator<KeyValuePair<long, T>> IEnumerable<KeyValuePair<long, T>>.GetEnumerator()
        {
            return this.GetEnumerator();
        }

        public void Dispose()
        {
            if (!m_disposed)
            {
                // Only dispose the source enumerator if you are doing dynamic partitioning
                if (!m_downcountEnumerators)
                {
                    m_reader.Dispose();
                }
                m_disposed = true;
            }
        }

        // Called from Dispose() method of spawned InternalEnumerator.  During
        // static partitioning, the source enumerator will be automatically
        // disposed once all requested InternalEnumerators have been disposed.
        public void DisposeEnumerator()
        {
            if (m_downcountEnumerators)
            {
                if (Interlocked.Decrement(ref m_activeEnumerators) == 0)
                {
                    m_reader.Dispose();
                }
            }
        }

        IEnumerator IEnumerable.GetEnumerator()
        {
            throw new NotImplementedException();
        }
    }

    // Internal class that serves as a shared enumerator for 
    // the underlying collection.
    private class InternalEnumerator : IEnumerator<KeyValuePair<long, T>>
    {
        KeyValuePair<long, T> m_current;
        IEnumerator<T> m_source;
        InternalEnumerable m_controllingEnumerable;
        Shared<long> m_index = null;
        bool m_disposed = false;

        public InternalEnumerator(IEnumerator<T> source, InternalEnumerable controllingEnumerable, Shared<long> index)
        {
            m_source = source;
            m_current = default(KeyValuePair<long, T>);
            m_controllingEnumerable = controllingEnumerable;
            m_index = index;
        }

        object IEnumerator.Current
        {
            get { return m_current; }
        }

        KeyValuePair<long, T> IEnumerator<KeyValuePair<long, T>>.Current
        {
            get { return m_current; }
        }

        void IEnumerator.Reset()
        {
            throw new NotSupportedException("Reset() not supported");
        }

        // This method is the crux of this class.  Under lock, it calls
        // MoveNext() on the underlying enumerator, grabs Current and index, 
        // and increments the index.
        bool IEnumerator.MoveNext()
        {
            bool rval = false;
            lock (m_source)
            {
                rval = m_source.MoveNext();
                if (rval)
                {
                    m_current = new KeyValuePair<long, T>(m_index.Value, m_source.Current);
                    m_index.Value = m_index.Value + 1;
                }
                else m_current = default(KeyValuePair<long, T>);
            }
            return rval;
        }

        void IDisposable.Dispose()
        {
            if (!m_disposed)
            {
                // Delegate to parent enumerable's DisposeEnumerator() method
                m_controllingEnumerable.DisposeEnumerator();
                m_disposed = true;
            }
        }
    }

    // Constructor just grabs the collection to wrap
    public SingleElementOrderablePartitioner(IEnumerable<T> enumerable)
        : base(true, true, true)
    {
        // Verify that the source IEnumerable is not null
        if (enumerable == null)
            throw new ArgumentNullException("enumerable");

        m_referenceEnumerable = enumerable;
    }

    // Produces a list of "numPartitions" IEnumerators that can each be
    // used to traverse the underlying collection in a thread-safe manner.
    // This will return a static number of enumerators, as opposed to
    // GetOrderableDynamicPartitions(), the result of which can be used to produce
    // any number of enumerators.
    public override IList<IEnumerator<KeyValuePair<long, T>>> GetOrderablePartitions(int numPartitions)
    {
        if (numPartitions < 1)
            throw new ArgumentOutOfRangeException("NumPartitions");

        List<IEnumerator<KeyValuePair<long, T>>> list = new List<IEnumerator<KeyValuePair<long, T>>>(numPartitions);

        // Since we are doing static partitioning, create an InternalEnumerable with reference
        // counting of spawned InternalEnumerators turned on.  Once all of the spawned enumerators
        // are disposed, dynamicPartitions will be disposed.
        var dynamicPartitions = new InternalEnumerable(m_referenceEnumerable.GetEnumerator(), true);
        for (int i = 0; i < numPartitions; i++)
            list.Add(dynamicPartitions.GetEnumerator());

        return list;
    }

    // Returns an instance of our internal Enumerable class.  GetEnumerator()
    // can then be called on that (multiple times) to produce shared enumerators.
    public override IEnumerable<KeyValuePair<long, T>> GetOrderableDynamicPartitions()
    {
        // Since we are doing dynamic partitioning, create an InternalEnumerable with reference
        // counting of spawned InternalEnumerators turned off.  This returned InternalEnumerable
        // will need to be explicitly disposed.
        return new InternalEnumerable(m_referenceEnumerable.GetEnumerator(), false);
    }

    // Must be set to true if GetDynamicPartitions() is supported.
    public override bool SupportsDynamicPartitions
    {
        get { return true; }
    }
}

以下是如何使用上述 OrderablePartitioner 构造 Parallel.ForEach 的示例。了解如何完全从 ForEach impl 中重构出您的 finally 块?

public class Program
{
    static void Main(string[] args)
    {
        //
        // First a fairly simple visual test
        //
        var someCollection = new string[] { "four", "score", "and", "twenty", "years", "ago" };
        var someOrderablePartitioner = new SingleElementOrderablePartitioner<string>(someCollection);
        Parallel.ForEach(someOrderablePartitioner, (item, state, index) =>
        {
            Console.WriteLine("ForEach: item = {0}, index = {1}, thread id = {2}", item, index, Thread.CurrentThread.ManagedThreadId);
        });

        //
        // Now a more rigorous test of dynamic partitioning (used by Parallel.ForEach)
        //
        List<int> src = Enumerable.Range(0, 100000).ToList();
        SingleElementOrderablePartitioner<int> myOP = new SingleElementOrderablePartitioner<int>(src);

        int counter = 0;
        bool mismatch = false;
        Parallel.ForEach(myOP, (item, state, index) =>
        {
            if (item != index) mismatch = true;
            Interlocked.Increment(ref counter);
        });

        if (mismatch) Console.WriteLine("OrderablePartitioner Test: index mismatch detected");
        Console.WriteLine("OrderablePartitioner test: counter = {0}, should be 100000", counter);
    }
}

另外这个 link 可能有用 ("Write a simple parallel.ForEach Loop")