为什么 BlockingCollection<T> 会随着您向其添加更多线程而变慢?
Why does BlockingCollection<T> slow down as you add more threads to it?
我正在对 BlockingCollection 进行一些分析,以便在 UDP 数据处理场景中使用它。目前,它以单一消费者方式使用,将传入的 UDP 数据写入集合,然后进行相应处理。
我想我会做一些关于有多个消费者/任务来提高性能的分析,但我看到了一些奇怪的结果,所以我一定是遗漏了一些东西。
仅供参考,取消令牌未用于分析。
本质是排队1000000个号码,然后从不同的线程中取出它们,我认为这会提高性能,但它有相反的影响。
这是测试/分析的基本设置(控制台应用程序)
static BlockingCollection<int> Queue = new BlockingCollection<int>();
static void Main(string[] args) {
m_tokenProcessData = new CancellationTokenSource();
m_cancellationToken = m_tokenProcessData.Token;
PrepareQueue();
StartTasks(1);
Console.ReadKey();
}
static void PrepareQueue() {
for (int i = 0; i <= 1000000; i++) {
Queue.Add(i);
}
}
static void StartTasks( int maxTasks ) {
m_startTime = DateTime.Now;
for(int i=0; i<=maxTasks; i++ ) {
Task.Factory.StartNew(() => ProcessData(), m_cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
}
static void ProcessData( ) {
foreach( var number in Queue.GetConsumingEnumerable() ) {
Task.Delay(10);
var test = Queue.Count;
if (test == 0) Finish();
}
}
static void Finish() {
var endTime = DateTime.Now;
var timeTaken = (endTime - m_startTime).TotalMilliseconds;
Console.WriteLine($"Processing Took {timeTaken}ms");
}
Task.Delay(10) 只是在那里模拟正在完成的一些工作。
测试结果
1 Task = 3217ms
2 Tasks = 3178ms
4 Tasks = 3365ms
8 Tasks = 3986ms
16 Tasks = 4380ms
32 Tasks = 3954ms
64 Tasks = 4854ms
任何人都可以帮助解决我可能遗漏/不理解的问题吗?
谢谢,
丹尼尔.
BlockingCollection<T>
是一个线程安全的组件,并且包含一个同步原语(一个 SemaphoreSlim
对象)。我的猜测是你扔给它的线程越多,信号量必须等待的时间就越多。请参阅 BlockingCollection 源代码中的 Line 431。
在一个问题上投入更多线程也会增加开销。经常发生最佳解决方案不是更多线程。根据 Producer/Consumer 情况,最佳线程数通常是 您计算机中的核心数, 因为除此之外,线程只是在执行任务-切换。
我正在对 BlockingCollection 进行一些分析,以便在 UDP 数据处理场景中使用它。目前,它以单一消费者方式使用,将传入的 UDP 数据写入集合,然后进行相应处理。
我想我会做一些关于有多个消费者/任务来提高性能的分析,但我看到了一些奇怪的结果,所以我一定是遗漏了一些东西。
仅供参考,取消令牌未用于分析。
本质是排队1000000个号码,然后从不同的线程中取出它们,我认为这会提高性能,但它有相反的影响。
这是测试/分析的基本设置(控制台应用程序)
static BlockingCollection<int> Queue = new BlockingCollection<int>();
static void Main(string[] args) {
m_tokenProcessData = new CancellationTokenSource();
m_cancellationToken = m_tokenProcessData.Token;
PrepareQueue();
StartTasks(1);
Console.ReadKey();
}
static void PrepareQueue() {
for (int i = 0; i <= 1000000; i++) {
Queue.Add(i);
}
}
static void StartTasks( int maxTasks ) {
m_startTime = DateTime.Now;
for(int i=0; i<=maxTasks; i++ ) {
Task.Factory.StartNew(() => ProcessData(), m_cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
}
static void ProcessData( ) {
foreach( var number in Queue.GetConsumingEnumerable() ) {
Task.Delay(10);
var test = Queue.Count;
if (test == 0) Finish();
}
}
static void Finish() {
var endTime = DateTime.Now;
var timeTaken = (endTime - m_startTime).TotalMilliseconds;
Console.WriteLine($"Processing Took {timeTaken}ms");
}
Task.Delay(10) 只是在那里模拟正在完成的一些工作。
测试结果
1 Task = 3217ms
2 Tasks = 3178ms
4 Tasks = 3365ms
8 Tasks = 3986ms
16 Tasks = 4380ms
32 Tasks = 3954ms
64 Tasks = 4854ms
任何人都可以帮助解决我可能遗漏/不理解的问题吗?
谢谢,
丹尼尔.
BlockingCollection<T>
是一个线程安全的组件,并且包含一个同步原语(一个 SemaphoreSlim
对象)。我的猜测是你扔给它的线程越多,信号量必须等待的时间就越多。请参阅 BlockingCollection 源代码中的 Line 431。
在一个问题上投入更多线程也会增加开销。经常发生最佳解决方案不是更多线程。根据 Producer/Consumer 情况,最佳线程数通常是 您计算机中的核心数, 因为除此之外,线程只是在执行任务-切换。