阻止包含多个元素的集合?
Blocking collection with multiple elements?
生产者-消费者集合 [1] [2] 的所有 C# 实现似乎都有类似于以下的接口:
private Queue<T> items;
public void Produce(T item)
public T Consume()
有没有像下面这样的实现?
private Queue<T> items;
public void Produce(T[] item)
public T[] Consume(int count)
希望这能让我一次 produce/consume 不同数量的项目,而不需要过多的每个项目锁定。这对于 producing/consuming 大量项目似乎是必要的,但我没有找到任何实现的运气。
[1] C# producer/consumer
[2] Creating a blocking Queue<T> in .NET?
The hope is that this would let me produce/consume varying numbers of items at a time without requiring excessive per-item locking.
您可以使用 BlockingCollection<T>
class;虽然它没有添加或获取多个项目的方法,但它内部不使用锁。
有多种可能的方法,具体取决于您要实施的内容。
有IProducerConsumerCollection<T>
interface. The only thread safe implementation of this interface in the .NET framework to my knowledge is the BlockingCollection<T>
的实现。
这 class 允许您拥有阻塞或非阻塞的生产者和消费者。生产者端通过在构造函数中为集合提供容量限制来设置在阻塞和非阻塞之间。正如 BlockingCollection<T>.Add(T)
方法的文档所述:
If a bounded capacity was specified when this instance of BlockingCollection<T>
was initialized, a call to Add may block until space is available to store the provided item.
要获取项目,您可以使用不同的 Take
和 TryTake
方法,或者调用非常方便的 BlockingCollection<T>.GetConsumingEnumerable()
method that creates a IEnumerable<T>
that creates a IEnumerator<T>
that consumes one element from the BlockingCollection<T>
when fetching the next value and blocking in case the source collection is empty. That is until BlockingCollection<T>.CompleteAdding()
并且集合不接受任何新数据。此时所有消耗可枚举实例的实例将停止阻塞并报告没有数据了(一旦所有剩余数据都被消耗完。)
所以你基本上可以像这样实现一个消费者:
BlockingCollection<...> bc = ...
foreach (var item in bc.GetConsumingEnumerable())
{
// do something with your item
}
这样的消费者可以在多个线程中启动,因此如果您愿意,您可以从源中读取多个线程。您可以创建任意数量的消费枚举。
您应该知道这个集合实际上只是一个包装器。有一个构造函数允许您设置所使用的集合类型。默认情况下 ConcurrentQueue<T>
。这意味着默认情况下集合的行为类似于此队列并且是先进先出集合,以防您只使用一个生产者和一个消费者。
综上所述,还有一个选择。如果您不需要阻塞部分(或者您想自己实现阻塞部分)并且不需要集合中的任何元素顺序,则可以使用 ConcurrentBag<T>
. This collection handles access from multiple threads, very efficiently. It uses smaller collections inside ThreadLocal<T>
包装器。所以每个线程都使用它自己的存储,只有当线程用完它自己的存储中的项目时,它才会开始从另一个线程存储中获取项目。
如果生产和消费在您的用例中按顺序发生,则使用此集合可能会很有趣。所以你首先添加所有项目,一旦完成你消耗所有项目,两者都有多个线程。
正好需要这个,我自己创建了一个方法扩展。请注意,如果至少有一个元素从队列中移除,此调用将记录任何进一步的异常并返回该元素以防止丢失任何内容。
public static class BlockingCollectionMethodExtensions
{
public static List<T> FetchAtLeastOneBlocking<T>(this BlockingCollection<T> threadSafeQueue, int maxCount, ICommonLog log)
{
var resultList = new List<T>();
// Take() will block the thread until new elements appear
// It will also throw an InvalidOperationException when blockingCollection is Completed
resultList.Add(threadSafeQueue.Take());
try
{
// Fetch more unblocking
while (threadSafeQueue.Count > 0 && resultList.Count < maxCount)
{
T item;
bool success = false;
success = threadSafeQueue.TryTake(out item);
if (success)
{
resultList.Add(item);
}
else
{
}
}
}
catch (Exception ex)
{
log.Fatal($"Unknown error fetching more elements. Continuing to process the {resultList.Count} already fetched items.", ex);
}
return resultList;
}
}
以及相应的测试:
public class BlockingCollectionMethodExtensionsTest : UnitTestBase
{
[Fact]
public void FetchAtLeastOneBlocking_FirstEmpty_ThenSingleEntryAdded_ExpectBlocking_Test()
{
var queue = new BlockingCollection<int>();
var startEvent = new ManualResetEvent(initialState: false);
var completedEvent = new ManualResetEvent(initialState: false);
List<int> fetchResult = null;
var thread = new Thread(() =>
{
startEvent.Set();
fetchResult = queue.FetchAtLeastOneBlocking<int>(maxCount: 3, log: null);
completedEvent.Set();
});
thread.Start();
var startedSuccess = startEvent.WaitOne(TimeSpan.FromSeconds(2)); // Wait until started
Assert.True(startedSuccess);
// Now wait for 2 seconds to ensure that nothing will be fetched
Thread.Sleep(TimeSpan.FromSeconds(1));
Assert.Null(fetchResult);
// Add a new element and verify that the fetch method succeeded
queue.Add(78);
var completedSuccess = completedEvent.WaitOne(timeout: TimeSpan.FromSeconds(2));
Assert.True(completedSuccess);
Assert.NotNull(fetchResult);
Assert.Single(fetchResult);
Assert.Equal(78, fetchResult.Single());
}
[Fact]
public void FetchAtLeastOneBlocking_FirstEmpty_ThenCompleted_ExpectOperationException_Test()
{
var queue = new BlockingCollection<int>();
Exception catchedException = null;
var startEvent = new ManualResetEvent(initialState: false);
var exceptionEvent = new ManualResetEvent(initialState: false);
List<int> fetchResult = null;
var thread = new Thread(() =>
{
startEvent.Set();
try
{
fetchResult = queue.FetchAtLeastOneBlocking<int>(maxCount: 3, log: null);
}
catch (Exception ex)
{
catchedException = ex;
exceptionEvent.Set();
}
});
thread.Start();
var startedSuccess = startEvent.WaitOne(TimeSpan.FromSeconds(2)); // Wait until started
Assert.True(startedSuccess);
// Now wait for 2 seconds to ensure that nothing will be fetched
Thread.Sleep(TimeSpan.FromSeconds(1));
Assert.Null(fetchResult);
// Now complete the queue and assert that fetching threw the expected exception
queue.CompleteAdding();
// Wait for the exception to be thrown
var exceptionSuccess = exceptionEvent.WaitOne(TimeSpan.FromSeconds(2));
Assert.True(exceptionSuccess);
Assert.NotNull(catchedException);
Assert.IsType<InvalidOperationException>(catchedException);
}
[Fact]
public void FetchAtLeastOneBlocking_SingleEntryExists_ExpectNonblocking_Test()
{
var queue = new BlockingCollection<int>();
// Add a new element and verify that the fetch method succeeded
queue.Add(78);
var startEvent = new ManualResetEvent(initialState: false);
var completedEvent = new ManualResetEvent(initialState: false);
List<int> fetchResult = null;
var thread = new Thread(() =>
{
startEvent.Set();
fetchResult = queue.FetchAtLeastOneBlocking<int>(maxCount: 3, log: null);
completedEvent.Set();
});
thread.Start();
var startedSuccess = startEvent.WaitOne(TimeSpan.FromSeconds(2)); // Wait until started
Assert.True(startedSuccess);
// Now wait for expected immediate completion
var completedSuccess = completedEvent.WaitOne(timeout: TimeSpan.FromSeconds(2));
Assert.True(completedSuccess);
Assert.NotNull(fetchResult);
Assert.Single(fetchResult);
Assert.Equal(78, fetchResult.Single());
}
[Fact]
public void FetchAtLeastOneBlocking_MultipleEntriesExist_ExpectNonblocking_Test()
{
var queue = new BlockingCollection<int>();
// Add a new element and verify that the fetch method succeeded
queue.Add(78);
queue.Add(79);
var startEvent = new ManualResetEvent(initialState: false);
var completedEvent = new ManualResetEvent(initialState: false);
List<int> fetchResult = null;
var thread = new Thread(() =>
{
startEvent.Set();
fetchResult = queue.FetchAtLeastOneBlocking<int>(maxCount: 3, log: null);
completedEvent.Set();
});
thread.Start();
var startedSuccess = startEvent.WaitOne(TimeSpan.FromSeconds(2)); // Wait until started
Assert.True(startedSuccess);
// Now wait for expected immediate completion
var completedSuccess = completedEvent.WaitOne(timeout: TimeSpan.FromSeconds(2));
Assert.True(completedSuccess);
Assert.NotNull(fetchResult);
Assert.Equal(2, fetchResult.Count);
Assert.Equal(78, fetchResult[0]);
Assert.Equal(79, fetchResult[1]);
}
[Fact]
public void FetchAtLeastOneBlocking_MultipleEntriesExist_MaxCountExceeded_ExpectNonblocking_Test()
{
var queue = new BlockingCollection<int>();
// Add a new element and verify that the fetch method succeeded
queue.Add(78);
queue.Add(79);
queue.Add(80);
queue.Add(81);
var startEvent = new ManualResetEvent(initialState: false);
var completedEvent = new ManualResetEvent(initialState: false);
List<int> fetchResult = null;
var thread = new Thread(() =>
{
startEvent.Set();
fetchResult = queue.FetchAtLeastOneBlocking<int>(maxCount: 3, log: null);
completedEvent.Set();
});
thread.Start();
var startedSuccess = startEvent.WaitOne(TimeSpan.FromSeconds(2)); // Wait until started
Assert.True(startedSuccess);
// Now wait for expected immediate completion
var completedSuccess = completedEvent.WaitOne(timeout: TimeSpan.FromSeconds(2));
Assert.True(completedSuccess);
Assert.NotNull(fetchResult);
Assert.Equal(3, fetchResult.Count);
Assert.Equal(78, fetchResult[0]);
Assert.Equal(79, fetchResult[1]);
Assert.Equal(80, fetchResult[2]);
}
}
生产者-消费者集合 [1] [2] 的所有 C# 实现似乎都有类似于以下的接口:
private Queue<T> items;
public void Produce(T item)
public T Consume()
有没有像下面这样的实现?
private Queue<T> items;
public void Produce(T[] item)
public T[] Consume(int count)
希望这能让我一次 produce/consume 不同数量的项目,而不需要过多的每个项目锁定。这对于 producing/consuming 大量项目似乎是必要的,但我没有找到任何实现的运气。
[1] C# producer/consumer
[2] Creating a blocking Queue<T> in .NET?
The hope is that this would let me produce/consume varying numbers of items at a time without requiring excessive per-item locking.
您可以使用 BlockingCollection<T>
class;虽然它没有添加或获取多个项目的方法,但它内部不使用锁。
有多种可能的方法,具体取决于您要实施的内容。
有IProducerConsumerCollection<T>
interface. The only thread safe implementation of this interface in the .NET framework to my knowledge is the BlockingCollection<T>
的实现。
这 class 允许您拥有阻塞或非阻塞的生产者和消费者。生产者端通过在构造函数中为集合提供容量限制来设置在阻塞和非阻塞之间。正如 BlockingCollection<T>.Add(T)
方法的文档所述:
If a bounded capacity was specified when this instance of
BlockingCollection<T>
was initialized, a call to Add may block until space is available to store the provided item.
要获取项目,您可以使用不同的 Take
和 TryTake
方法,或者调用非常方便的 BlockingCollection<T>.GetConsumingEnumerable()
method that creates a IEnumerable<T>
that creates a IEnumerator<T>
that consumes one element from the BlockingCollection<T>
when fetching the next value and blocking in case the source collection is empty. That is until BlockingCollection<T>.CompleteAdding()
并且集合不接受任何新数据。此时所有消耗可枚举实例的实例将停止阻塞并报告没有数据了(一旦所有剩余数据都被消耗完。)
所以你基本上可以像这样实现一个消费者:
BlockingCollection<...> bc = ...
foreach (var item in bc.GetConsumingEnumerable())
{
// do something with your item
}
这样的消费者可以在多个线程中启动,因此如果您愿意,您可以从源中读取多个线程。您可以创建任意数量的消费枚举。
您应该知道这个集合实际上只是一个包装器。有一个构造函数允许您设置所使用的集合类型。默认情况下 ConcurrentQueue<T>
。这意味着默认情况下集合的行为类似于此队列并且是先进先出集合,以防您只使用一个生产者和一个消费者。
综上所述,还有一个选择。如果您不需要阻塞部分(或者您想自己实现阻塞部分)并且不需要集合中的任何元素顺序,则可以使用 ConcurrentBag<T>
. This collection handles access from multiple threads, very efficiently. It uses smaller collections inside ThreadLocal<T>
包装器。所以每个线程都使用它自己的存储,只有当线程用完它自己的存储中的项目时,它才会开始从另一个线程存储中获取项目。
如果生产和消费在您的用例中按顺序发生,则使用此集合可能会很有趣。所以你首先添加所有项目,一旦完成你消耗所有项目,两者都有多个线程。
正好需要这个,我自己创建了一个方法扩展。请注意,如果至少有一个元素从队列中移除,此调用将记录任何进一步的异常并返回该元素以防止丢失任何内容。
public static class BlockingCollectionMethodExtensions
{
public static List<T> FetchAtLeastOneBlocking<T>(this BlockingCollection<T> threadSafeQueue, int maxCount, ICommonLog log)
{
var resultList = new List<T>();
// Take() will block the thread until new elements appear
// It will also throw an InvalidOperationException when blockingCollection is Completed
resultList.Add(threadSafeQueue.Take());
try
{
// Fetch more unblocking
while (threadSafeQueue.Count > 0 && resultList.Count < maxCount)
{
T item;
bool success = false;
success = threadSafeQueue.TryTake(out item);
if (success)
{
resultList.Add(item);
}
else
{
}
}
}
catch (Exception ex)
{
log.Fatal($"Unknown error fetching more elements. Continuing to process the {resultList.Count} already fetched items.", ex);
}
return resultList;
}
}
以及相应的测试:
public class BlockingCollectionMethodExtensionsTest : UnitTestBase
{
[Fact]
public void FetchAtLeastOneBlocking_FirstEmpty_ThenSingleEntryAdded_ExpectBlocking_Test()
{
var queue = new BlockingCollection<int>();
var startEvent = new ManualResetEvent(initialState: false);
var completedEvent = new ManualResetEvent(initialState: false);
List<int> fetchResult = null;
var thread = new Thread(() =>
{
startEvent.Set();
fetchResult = queue.FetchAtLeastOneBlocking<int>(maxCount: 3, log: null);
completedEvent.Set();
});
thread.Start();
var startedSuccess = startEvent.WaitOne(TimeSpan.FromSeconds(2)); // Wait until started
Assert.True(startedSuccess);
// Now wait for 2 seconds to ensure that nothing will be fetched
Thread.Sleep(TimeSpan.FromSeconds(1));
Assert.Null(fetchResult);
// Add a new element and verify that the fetch method succeeded
queue.Add(78);
var completedSuccess = completedEvent.WaitOne(timeout: TimeSpan.FromSeconds(2));
Assert.True(completedSuccess);
Assert.NotNull(fetchResult);
Assert.Single(fetchResult);
Assert.Equal(78, fetchResult.Single());
}
[Fact]
public void FetchAtLeastOneBlocking_FirstEmpty_ThenCompleted_ExpectOperationException_Test()
{
var queue = new BlockingCollection<int>();
Exception catchedException = null;
var startEvent = new ManualResetEvent(initialState: false);
var exceptionEvent = new ManualResetEvent(initialState: false);
List<int> fetchResult = null;
var thread = new Thread(() =>
{
startEvent.Set();
try
{
fetchResult = queue.FetchAtLeastOneBlocking<int>(maxCount: 3, log: null);
}
catch (Exception ex)
{
catchedException = ex;
exceptionEvent.Set();
}
});
thread.Start();
var startedSuccess = startEvent.WaitOne(TimeSpan.FromSeconds(2)); // Wait until started
Assert.True(startedSuccess);
// Now wait for 2 seconds to ensure that nothing will be fetched
Thread.Sleep(TimeSpan.FromSeconds(1));
Assert.Null(fetchResult);
// Now complete the queue and assert that fetching threw the expected exception
queue.CompleteAdding();
// Wait for the exception to be thrown
var exceptionSuccess = exceptionEvent.WaitOne(TimeSpan.FromSeconds(2));
Assert.True(exceptionSuccess);
Assert.NotNull(catchedException);
Assert.IsType<InvalidOperationException>(catchedException);
}
[Fact]
public void FetchAtLeastOneBlocking_SingleEntryExists_ExpectNonblocking_Test()
{
var queue = new BlockingCollection<int>();
// Add a new element and verify that the fetch method succeeded
queue.Add(78);
var startEvent = new ManualResetEvent(initialState: false);
var completedEvent = new ManualResetEvent(initialState: false);
List<int> fetchResult = null;
var thread = new Thread(() =>
{
startEvent.Set();
fetchResult = queue.FetchAtLeastOneBlocking<int>(maxCount: 3, log: null);
completedEvent.Set();
});
thread.Start();
var startedSuccess = startEvent.WaitOne(TimeSpan.FromSeconds(2)); // Wait until started
Assert.True(startedSuccess);
// Now wait for expected immediate completion
var completedSuccess = completedEvent.WaitOne(timeout: TimeSpan.FromSeconds(2));
Assert.True(completedSuccess);
Assert.NotNull(fetchResult);
Assert.Single(fetchResult);
Assert.Equal(78, fetchResult.Single());
}
[Fact]
public void FetchAtLeastOneBlocking_MultipleEntriesExist_ExpectNonblocking_Test()
{
var queue = new BlockingCollection<int>();
// Add a new element and verify that the fetch method succeeded
queue.Add(78);
queue.Add(79);
var startEvent = new ManualResetEvent(initialState: false);
var completedEvent = new ManualResetEvent(initialState: false);
List<int> fetchResult = null;
var thread = new Thread(() =>
{
startEvent.Set();
fetchResult = queue.FetchAtLeastOneBlocking<int>(maxCount: 3, log: null);
completedEvent.Set();
});
thread.Start();
var startedSuccess = startEvent.WaitOne(TimeSpan.FromSeconds(2)); // Wait until started
Assert.True(startedSuccess);
// Now wait for expected immediate completion
var completedSuccess = completedEvent.WaitOne(timeout: TimeSpan.FromSeconds(2));
Assert.True(completedSuccess);
Assert.NotNull(fetchResult);
Assert.Equal(2, fetchResult.Count);
Assert.Equal(78, fetchResult[0]);
Assert.Equal(79, fetchResult[1]);
}
[Fact]
public void FetchAtLeastOneBlocking_MultipleEntriesExist_MaxCountExceeded_ExpectNonblocking_Test()
{
var queue = new BlockingCollection<int>();
// Add a new element and verify that the fetch method succeeded
queue.Add(78);
queue.Add(79);
queue.Add(80);
queue.Add(81);
var startEvent = new ManualResetEvent(initialState: false);
var completedEvent = new ManualResetEvent(initialState: false);
List<int> fetchResult = null;
var thread = new Thread(() =>
{
startEvent.Set();
fetchResult = queue.FetchAtLeastOneBlocking<int>(maxCount: 3, log: null);
completedEvent.Set();
});
thread.Start();
var startedSuccess = startEvent.WaitOne(TimeSpan.FromSeconds(2)); // Wait until started
Assert.True(startedSuccess);
// Now wait for expected immediate completion
var completedSuccess = completedEvent.WaitOne(timeout: TimeSpan.FromSeconds(2));
Assert.True(completedSuccess);
Assert.NotNull(fetchResult);
Assert.Equal(3, fetchResult.Count);
Assert.Equal(78, fetchResult[0]);
Assert.Equal(79, fetchResult[1]);
Assert.Equal(80, fetchResult[2]);
}
}