C# 如何划分并行 foreach 循环以迭代我的列表
C# How to partition parallel foreach loop to iterate my list
我是编程界的新手。我正在毕业,也在学习 dotnet。
我想为每个并行迭代我的列表,但我想在那里使用分区。我缺乏知识,所以我的代码无法编译。
实际上我是先这样做的,效果很好。
Parallel.ForEach(MyBroker, broker =>,,
{
mybrow = new WeightageRowNumber();
mybrow.RowNumber = Interlocked.Increment(ref rowNumber);
lock (_lock)
{
Mylist.Add(mybrow);
}
});
现在我想使用分区,所以我用这种方式更改了我的代码,但现在我的代码无法编译。这是代码
Parallel.ForEach(MyBroker, broker,
(j, loop, subtotal) =>
{
mybrow = new WeightageRowNumber();
mybrow.RowNumber = Interlocked.Increment(ref rowNumber);
lock (_lock)
{
Mylist.Add(mybrow);
}
return brokerRowWeightageRowNumber.RowNumber;
},
(finalResult) =>
var rownum= Interlocked.Increment(ref finalResult);
console.writeline(rownum);
);
请查看我的第二组代码并向我展示如何重组以使用并行 foreach 的分区来迭代我的列表。
请指导我。谢谢
Parallel.ForEach 方法有 20 个重载 - 也许尝试不同的重载?
如果不包括您的依赖项,我无法就您的实施给出一对一的示例,但这里有一个深入的示例(根据 here 重新格式化),您可以将其复制到您的 [=24] =] 并设置调试断点(如果有用的话)。不幸的是,构建 OrderablePartitioner 的可实例化重载似乎并不简单,因此对所有样板代码感到抱歉:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading;
using System.Collections.Concurrent;
using System.Collections;
using System.Linq;
// Simple partitioner that will extract one (index,item) pair at a time,
// in a thread-safe fashion, from the underlying collection.
class SingleElementOrderablePartitioner<T> : OrderablePartitioner<T>
{
// The collection being wrapped by this Partitioner
IEnumerable<T> m_referenceEnumerable;
// Class used to wrap m_index for the purpose of sharing access to it
// between an InternalEnumerable and multiple InternalEnumerators
private class Shared<U>
{
internal U Value;
public Shared(U item)
{
Value = item;
}
}
// Internal class that serves as a shared enumerable for the
// underlying collection.
private class InternalEnumerable : IEnumerable<KeyValuePair<long, T>>, IDisposable
{
IEnumerator<T> m_reader;
bool m_disposed = false;
Shared<long> m_index = null;
// These two are used to implement Dispose() when static partitioning is being performed
int m_activeEnumerators;
bool m_downcountEnumerators;
// "downcountEnumerators" will be true for static partitioning, false for
// dynamic partitioning.
public InternalEnumerable(IEnumerator<T> reader, bool downcountEnumerators)
{
m_reader = reader;
m_index = new Shared<long>(0);
m_activeEnumerators = 0;
m_downcountEnumerators = downcountEnumerators;
}
public IEnumerator<KeyValuePair<long, T>> GetEnumerator()
{
if (m_disposed)
throw new ObjectDisposedException("InternalEnumerable: Can't call GetEnumerator() after disposing");
// For static partitioning, keep track of the number of active enumerators.
if (m_downcountEnumerators) Interlocked.Increment(ref m_activeEnumerators);
return new InternalEnumerator(m_reader, this, m_index);
}
IEnumerator<KeyValuePair<long, T>> IEnumerable<KeyValuePair<long, T>>.GetEnumerator()
{
return this.GetEnumerator();
}
public void Dispose()
{
if (!m_disposed)
{
// Only dispose the source enumerator if you are doing dynamic partitioning
if (!m_downcountEnumerators)
{
m_reader.Dispose();
}
m_disposed = true;
}
}
// Called from Dispose() method of spawned InternalEnumerator. During
// static partitioning, the source enumerator will be automatically
// disposed once all requested InternalEnumerators have been disposed.
public void DisposeEnumerator()
{
if (m_downcountEnumerators)
{
if (Interlocked.Decrement(ref m_activeEnumerators) == 0)
{
m_reader.Dispose();
}
}
}
IEnumerator IEnumerable.GetEnumerator()
{
throw new NotImplementedException();
}
}
// Internal class that serves as a shared enumerator for
// the underlying collection.
private class InternalEnumerator : IEnumerator<KeyValuePair<long, T>>
{
KeyValuePair<long, T> m_current;
IEnumerator<T> m_source;
InternalEnumerable m_controllingEnumerable;
Shared<long> m_index = null;
bool m_disposed = false;
public InternalEnumerator(IEnumerator<T> source, InternalEnumerable controllingEnumerable, Shared<long> index)
{
m_source = source;
m_current = default(KeyValuePair<long, T>);
m_controllingEnumerable = controllingEnumerable;
m_index = index;
}
object IEnumerator.Current
{
get { return m_current; }
}
KeyValuePair<long, T> IEnumerator<KeyValuePair<long, T>>.Current
{
get { return m_current; }
}
void IEnumerator.Reset()
{
throw new NotSupportedException("Reset() not supported");
}
// This method is the crux of this class. Under lock, it calls
// MoveNext() on the underlying enumerator, grabs Current and index,
// and increments the index.
bool IEnumerator.MoveNext()
{
bool rval = false;
lock (m_source)
{
rval = m_source.MoveNext();
if (rval)
{
m_current = new KeyValuePair<long, T>(m_index.Value, m_source.Current);
m_index.Value = m_index.Value + 1;
}
else m_current = default(KeyValuePair<long, T>);
}
return rval;
}
void IDisposable.Dispose()
{
if (!m_disposed)
{
// Delegate to parent enumerable's DisposeEnumerator() method
m_controllingEnumerable.DisposeEnumerator();
m_disposed = true;
}
}
}
// Constructor just grabs the collection to wrap
public SingleElementOrderablePartitioner(IEnumerable<T> enumerable)
: base(true, true, true)
{
// Verify that the source IEnumerable is not null
if (enumerable == null)
throw new ArgumentNullException("enumerable");
m_referenceEnumerable = enumerable;
}
// Produces a list of "numPartitions" IEnumerators that can each be
// used to traverse the underlying collection in a thread-safe manner.
// This will return a static number of enumerators, as opposed to
// GetOrderableDynamicPartitions(), the result of which can be used to produce
// any number of enumerators.
public override IList<IEnumerator<KeyValuePair<long, T>>> GetOrderablePartitions(int numPartitions)
{
if (numPartitions < 1)
throw new ArgumentOutOfRangeException("NumPartitions");
List<IEnumerator<KeyValuePair<long, T>>> list = new List<IEnumerator<KeyValuePair<long, T>>>(numPartitions);
// Since we are doing static partitioning, create an InternalEnumerable with reference
// counting of spawned InternalEnumerators turned on. Once all of the spawned enumerators
// are disposed, dynamicPartitions will be disposed.
var dynamicPartitions = new InternalEnumerable(m_referenceEnumerable.GetEnumerator(), true);
for (int i = 0; i < numPartitions; i++)
list.Add(dynamicPartitions.GetEnumerator());
return list;
}
// Returns an instance of our internal Enumerable class. GetEnumerator()
// can then be called on that (multiple times) to produce shared enumerators.
public override IEnumerable<KeyValuePair<long, T>> GetOrderableDynamicPartitions()
{
// Since we are doing dynamic partitioning, create an InternalEnumerable with reference
// counting of spawned InternalEnumerators turned off. This returned InternalEnumerable
// will need to be explicitly disposed.
return new InternalEnumerable(m_referenceEnumerable.GetEnumerator(), false);
}
// Must be set to true if GetDynamicPartitions() is supported.
public override bool SupportsDynamicPartitions
{
get { return true; }
}
}
以下是如何使用上述 OrderablePartitioner 构造 Parallel.ForEach 的示例。了解如何完全从 ForEach impl 中重构出您的 finally 块?
public class Program
{
static void Main(string[] args)
{
//
// First a fairly simple visual test
//
var someCollection = new string[] { "four", "score", "and", "twenty", "years", "ago" };
var someOrderablePartitioner = new SingleElementOrderablePartitioner<string>(someCollection);
Parallel.ForEach(someOrderablePartitioner, (item, state, index) =>
{
Console.WriteLine("ForEach: item = {0}, index = {1}, thread id = {2}", item, index, Thread.CurrentThread.ManagedThreadId);
});
//
// Now a more rigorous test of dynamic partitioning (used by Parallel.ForEach)
//
List<int> src = Enumerable.Range(0, 100000).ToList();
SingleElementOrderablePartitioner<int> myOP = new SingleElementOrderablePartitioner<int>(src);
int counter = 0;
bool mismatch = false;
Parallel.ForEach(myOP, (item, state, index) =>
{
if (item != index) mismatch = true;
Interlocked.Increment(ref counter);
});
if (mismatch) Console.WriteLine("OrderablePartitioner Test: index mismatch detected");
Console.WriteLine("OrderablePartitioner test: counter = {0}, should be 100000", counter);
}
}
另外这个 link 可能有用 ("Write a simple parallel.ForEach Loop")
我是编程界的新手。我正在毕业,也在学习 dotnet。
我想为每个并行迭代我的列表,但我想在那里使用分区。我缺乏知识,所以我的代码无法编译。
实际上我是先这样做的,效果很好。
Parallel.ForEach(MyBroker, broker =>,,
{
mybrow = new WeightageRowNumber();
mybrow.RowNumber = Interlocked.Increment(ref rowNumber);
lock (_lock)
{
Mylist.Add(mybrow);
}
});
现在我想使用分区,所以我用这种方式更改了我的代码,但现在我的代码无法编译。这是代码
Parallel.ForEach(MyBroker, broker,
(j, loop, subtotal) =>
{
mybrow = new WeightageRowNumber();
mybrow.RowNumber = Interlocked.Increment(ref rowNumber);
lock (_lock)
{
Mylist.Add(mybrow);
}
return brokerRowWeightageRowNumber.RowNumber;
},
(finalResult) =>
var rownum= Interlocked.Increment(ref finalResult);
console.writeline(rownum);
);
请查看我的第二组代码并向我展示如何重组以使用并行 foreach 的分区来迭代我的列表。
请指导我。谢谢
Parallel.ForEach 方法有 20 个重载 - 也许尝试不同的重载?
如果不包括您的依赖项,我无法就您的实施给出一对一的示例,但这里有一个深入的示例(根据 here 重新格式化),您可以将其复制到您的 [=24] =] 并设置调试断点(如果有用的话)。不幸的是,构建 OrderablePartitioner 的可实例化重载似乎并不简单,因此对所有样板代码感到抱歉:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading;
using System.Collections.Concurrent;
using System.Collections;
using System.Linq;
// Simple partitioner that will extract one (index,item) pair at a time,
// in a thread-safe fashion, from the underlying collection.
class SingleElementOrderablePartitioner<T> : OrderablePartitioner<T>
{
// The collection being wrapped by this Partitioner
IEnumerable<T> m_referenceEnumerable;
// Class used to wrap m_index for the purpose of sharing access to it
// between an InternalEnumerable and multiple InternalEnumerators
private class Shared<U>
{
internal U Value;
public Shared(U item)
{
Value = item;
}
}
// Internal class that serves as a shared enumerable for the
// underlying collection.
private class InternalEnumerable : IEnumerable<KeyValuePair<long, T>>, IDisposable
{
IEnumerator<T> m_reader;
bool m_disposed = false;
Shared<long> m_index = null;
// These two are used to implement Dispose() when static partitioning is being performed
int m_activeEnumerators;
bool m_downcountEnumerators;
// "downcountEnumerators" will be true for static partitioning, false for
// dynamic partitioning.
public InternalEnumerable(IEnumerator<T> reader, bool downcountEnumerators)
{
m_reader = reader;
m_index = new Shared<long>(0);
m_activeEnumerators = 0;
m_downcountEnumerators = downcountEnumerators;
}
public IEnumerator<KeyValuePair<long, T>> GetEnumerator()
{
if (m_disposed)
throw new ObjectDisposedException("InternalEnumerable: Can't call GetEnumerator() after disposing");
// For static partitioning, keep track of the number of active enumerators.
if (m_downcountEnumerators) Interlocked.Increment(ref m_activeEnumerators);
return new InternalEnumerator(m_reader, this, m_index);
}
IEnumerator<KeyValuePair<long, T>> IEnumerable<KeyValuePair<long, T>>.GetEnumerator()
{
return this.GetEnumerator();
}
public void Dispose()
{
if (!m_disposed)
{
// Only dispose the source enumerator if you are doing dynamic partitioning
if (!m_downcountEnumerators)
{
m_reader.Dispose();
}
m_disposed = true;
}
}
// Called from Dispose() method of spawned InternalEnumerator. During
// static partitioning, the source enumerator will be automatically
// disposed once all requested InternalEnumerators have been disposed.
public void DisposeEnumerator()
{
if (m_downcountEnumerators)
{
if (Interlocked.Decrement(ref m_activeEnumerators) == 0)
{
m_reader.Dispose();
}
}
}
IEnumerator IEnumerable.GetEnumerator()
{
throw new NotImplementedException();
}
}
// Internal class that serves as a shared enumerator for
// the underlying collection.
private class InternalEnumerator : IEnumerator<KeyValuePair<long, T>>
{
KeyValuePair<long, T> m_current;
IEnumerator<T> m_source;
InternalEnumerable m_controllingEnumerable;
Shared<long> m_index = null;
bool m_disposed = false;
public InternalEnumerator(IEnumerator<T> source, InternalEnumerable controllingEnumerable, Shared<long> index)
{
m_source = source;
m_current = default(KeyValuePair<long, T>);
m_controllingEnumerable = controllingEnumerable;
m_index = index;
}
object IEnumerator.Current
{
get { return m_current; }
}
KeyValuePair<long, T> IEnumerator<KeyValuePair<long, T>>.Current
{
get { return m_current; }
}
void IEnumerator.Reset()
{
throw new NotSupportedException("Reset() not supported");
}
// This method is the crux of this class. Under lock, it calls
// MoveNext() on the underlying enumerator, grabs Current and index,
// and increments the index.
bool IEnumerator.MoveNext()
{
bool rval = false;
lock (m_source)
{
rval = m_source.MoveNext();
if (rval)
{
m_current = new KeyValuePair<long, T>(m_index.Value, m_source.Current);
m_index.Value = m_index.Value + 1;
}
else m_current = default(KeyValuePair<long, T>);
}
return rval;
}
void IDisposable.Dispose()
{
if (!m_disposed)
{
// Delegate to parent enumerable's DisposeEnumerator() method
m_controllingEnumerable.DisposeEnumerator();
m_disposed = true;
}
}
}
// Constructor just grabs the collection to wrap
public SingleElementOrderablePartitioner(IEnumerable<T> enumerable)
: base(true, true, true)
{
// Verify that the source IEnumerable is not null
if (enumerable == null)
throw new ArgumentNullException("enumerable");
m_referenceEnumerable = enumerable;
}
// Produces a list of "numPartitions" IEnumerators that can each be
// used to traverse the underlying collection in a thread-safe manner.
// This will return a static number of enumerators, as opposed to
// GetOrderableDynamicPartitions(), the result of which can be used to produce
// any number of enumerators.
public override IList<IEnumerator<KeyValuePair<long, T>>> GetOrderablePartitions(int numPartitions)
{
if (numPartitions < 1)
throw new ArgumentOutOfRangeException("NumPartitions");
List<IEnumerator<KeyValuePair<long, T>>> list = new List<IEnumerator<KeyValuePair<long, T>>>(numPartitions);
// Since we are doing static partitioning, create an InternalEnumerable with reference
// counting of spawned InternalEnumerators turned on. Once all of the spawned enumerators
// are disposed, dynamicPartitions will be disposed.
var dynamicPartitions = new InternalEnumerable(m_referenceEnumerable.GetEnumerator(), true);
for (int i = 0; i < numPartitions; i++)
list.Add(dynamicPartitions.GetEnumerator());
return list;
}
// Returns an instance of our internal Enumerable class. GetEnumerator()
// can then be called on that (multiple times) to produce shared enumerators.
public override IEnumerable<KeyValuePair<long, T>> GetOrderableDynamicPartitions()
{
// Since we are doing dynamic partitioning, create an InternalEnumerable with reference
// counting of spawned InternalEnumerators turned off. This returned InternalEnumerable
// will need to be explicitly disposed.
return new InternalEnumerable(m_referenceEnumerable.GetEnumerator(), false);
}
// Must be set to true if GetDynamicPartitions() is supported.
public override bool SupportsDynamicPartitions
{
get { return true; }
}
}
以下是如何使用上述 OrderablePartitioner 构造 Parallel.ForEach 的示例。了解如何完全从 ForEach impl 中重构出您的 finally 块?
public class Program
{
static void Main(string[] args)
{
//
// First a fairly simple visual test
//
var someCollection = new string[] { "four", "score", "and", "twenty", "years", "ago" };
var someOrderablePartitioner = new SingleElementOrderablePartitioner<string>(someCollection);
Parallel.ForEach(someOrderablePartitioner, (item, state, index) =>
{
Console.WriteLine("ForEach: item = {0}, index = {1}, thread id = {2}", item, index, Thread.CurrentThread.ManagedThreadId);
});
//
// Now a more rigorous test of dynamic partitioning (used by Parallel.ForEach)
//
List<int> src = Enumerable.Range(0, 100000).ToList();
SingleElementOrderablePartitioner<int> myOP = new SingleElementOrderablePartitioner<int>(src);
int counter = 0;
bool mismatch = false;
Parallel.ForEach(myOP, (item, state, index) =>
{
if (item != index) mismatch = true;
Interlocked.Increment(ref counter);
});
if (mismatch) Console.WriteLine("OrderablePartitioner Test: index mismatch detected");
Console.WriteLine("OrderablePartitioner test: counter = {0}, should be 100000", counter);
}
}
另外这个 link 可能有用 ("Write a simple parallel.ForEach Loop")