FIFO 订单中的 TPL 生产者消费者#
TPL Producer Consumer in a FIFO order C#
我仅限于 .NET 3.5,并且我使用的是 TPL。
场景是producer-consumer但是不存在阻塞的问题。
PLINQ不能用于这种场景(因为限制),我们要实现的是最快的方式生产很多项目(其中每次生产都是long-运行ning,项目数量超过100,000)但是每个项目都必须以 FIFO 顺序消耗(这意味着,我要求生产的第一个项目必须首先被消耗,即使它是在其他项目之后创建的)并且也尽可能快地被消耗。
对于这个问题,我尝试使用任务列表,等待列表中的第一项完成 (taskList.First().IsCompleted()),然后在其上使用消费函数,但是对于出于某种原因,我似乎 运行 内存不足(可能是任务列表中的项目太多,因为任务等待开始?)有没有更好的方法来做到这一点? (我正在努力实现最快的速度)
非常感谢!
我认为这是一个有趣的问题,所以我花了一些时间。
我理解的场景是这样的:
- 您有一个已满的 BlockingCollection
- 多个线程启动,每个线程都尝试添加到 BlockingCollection。这些调用都会阻塞;这就是它们需要并行发生的原因。
- 随着 space 可用,Add 调用将畅通无阻。
- 对
Add
的调用需要按接收顺序完成。
首先说一下代码结构。与其使用 BlockingCollection 并围绕它编写过程代码,我建议扩展 BlockingCollection 并用您需要的功能替换 Add 方法。它可能看起来像这样:
public class QueuedBlockingCollection<T> : BlockingCollection<T>
{
private FifoMonitor monitor = new FifoMonitor();
public QueuedBlockingCollection(int max) : base (max) {}
public void Enqueue(T item)
{
using (monitor.Lock())
{
base.Add(item);
}
}
}
这里的诀窍是使用 FifoMonitor
class,它将为您提供 lock
的功能,但会强制执行命令。不幸的是,CLR 中不存在这样的 class。但是我们可以 write one:
public class FifoMonitor
{
public class FifoCriticalSection : IDisposable
{
private readonly FifoMonitor _parent;
public FifoCriticalSection(FifoMonitor parent)
{
_parent = parent;
_parent.Enter();
}
public void Dispose()
{
_parent.Exit();
}
}
private object _innerLock = new object();
private volatile int counter = 0;
private volatile int current = 1;
public FifoCriticalSection Lock()
{
return new FifoCriticalSection(this);
}
private void Enter()
{
int mine = Interlocked.Increment(ref counter);
Monitor.Enter(_innerLock);
while (current != mine) Monitor.Wait(_innerLock);
}
private void Exit()
{
Interlocked.Increment(ref current);
Monitor.PulseAll(_innerLock);
Monitor.Exit(_innerLock);
}
}
现在开始测试。这是我的程序:
public class Program
{
public static void Main()
{
//Setup
var blockingCollection = new QueuedBlockingCollection<int>(10);
var tasks = new Task[10];
//Block the collection by filling it up
for (int i=1; i<=10; i++) blockingCollection.Add(99);
//Start 10 threads all trying to add another value
for (int i=1; i<=10; i++)
{
int index = i; //unclose
tasks[index-1] = Task.Run( () => blockingCollection.Enqueue(index) );
Task.Delay(100).Wait(); //Wait long enough for the Enqueue call to block
}
//Purge the collection, making room for more values
while (blockingCollection.Count > 0)
{
var n = blockingCollection.Take();
Console.WriteLine(n);
}
//Wait for our pending adds to complete
Task.WaitAll(tasks);
//Display the collection in the order read
while (blockingCollection.Count > 0)
{
var n = blockingCollection.Take();
Console.WriteLine(n);
}
}
}
输出:
99
99
99
99
99
99
99
99
99
99
1
2
3
4
5
6
7
8
9
10
看起来有效!但为了确定,我将 Enqueue
改回 Add
,以确保解决方案确实有所作为。果然,它最终与常规 Add
.
乱序
99
99
99
99
99
99
99
99
99
99
2
3
4
6
1
5
7
8
9
10
查看 DotNetFiddle
上的代码
编辑后确定 - 不要将结果添加到 BlockingCollection 中,而是将任务添加到阻塞集合中。这具有按顺序处理项目的功能,并且具有最大并行度,这将防止启动太多线程并耗尽所有内存。
https://dotnetfiddle.net/lUbSqB
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
public class Program
{
private static BlockingCollection<Task<int>> BlockingCollection {get;set;}
public static void Producer(int numTasks)
{
Random r = new Random(7);
for(int i = 0 ; i < numTasks ; i++)
{
int closured = i;
Task<int> task = new Task<int>(()=>
{
Thread.Sleep(r.Next(100));
Console.WriteLine("Produced: " + closured);
return closured;
});
BlockingCollection.Add(task);
task.Start();
}
BlockingCollection.CompleteAdding();
}
public static void Main()
{
int numTasks = 20;
int maxParallelism = 3;
BlockingCollection = new BlockingCollection<Task<int>>(maxParallelism);
Task.Factory.StartNew(()=> Producer(numTasks));
foreach(var task in BlockingCollection.GetConsumingEnumerable())
{
task.Wait();
Console.WriteLine(" Consumed: "+ task.Result);
task.Dispose();
}
}
}
结果:
Produced: 0
Consumed: 0
Produced: 1
Consumed: 1
Produced: 3
Produced: 2
Consumed: 2
Consumed: 3
Produced: 4
Consumed: 4
Produced: 6
Produced: 5
Consumed: 5
Consumed: 6
Produced: 7
Consumed: 7
Produced: 8
Consumed: 8
Produced: 10
Produced: 9
Consumed: 9
Consumed: 10
Produced: 12
Produced: 13
Produced: 11
Consumed: 11
Consumed: 12
Consumed: 13
Produced: 15
Produced: 14
Consumed: 14
Consumed: 15
Produced: 17
Produced: 16
Produced: 18
Consumed: 16
Consumed: 17
Consumed: 18
Produced: 19
Consumed: 19
我仅限于 .NET 3.5,并且我使用的是 TPL。 场景是producer-consumer但是不存在阻塞的问题。 PLINQ不能用于这种场景(因为限制),我们要实现的是最快的方式生产很多项目(其中每次生产都是long-运行ning,项目数量超过100,000)但是每个项目都必须以 FIFO 顺序消耗(这意味着,我要求生产的第一个项目必须首先被消耗,即使它是在其他项目之后创建的)并且也尽可能快地被消耗。
对于这个问题,我尝试使用任务列表,等待列表中的第一项完成 (taskList.First().IsCompleted()),然后在其上使用消费函数,但是对于出于某种原因,我似乎 运行 内存不足(可能是任务列表中的项目太多,因为任务等待开始?)有没有更好的方法来做到这一点? (我正在努力实现最快的速度)
非常感谢!
我认为这是一个有趣的问题,所以我花了一些时间。
我理解的场景是这样的:
- 您有一个已满的 BlockingCollection
- 多个线程启动,每个线程都尝试添加到 BlockingCollection。这些调用都会阻塞;这就是它们需要并行发生的原因。
- 随着 space 可用,Add 调用将畅通无阻。
- 对
Add
的调用需要按接收顺序完成。
首先说一下代码结构。与其使用 BlockingCollection 并围绕它编写过程代码,我建议扩展 BlockingCollection 并用您需要的功能替换 Add 方法。它可能看起来像这样:
public class QueuedBlockingCollection<T> : BlockingCollection<T>
{
private FifoMonitor monitor = new FifoMonitor();
public QueuedBlockingCollection(int max) : base (max) {}
public void Enqueue(T item)
{
using (monitor.Lock())
{
base.Add(item);
}
}
}
这里的诀窍是使用 FifoMonitor
class,它将为您提供 lock
的功能,但会强制执行命令。不幸的是,CLR 中不存在这样的 class。但是我们可以 write one:
public class FifoMonitor
{
public class FifoCriticalSection : IDisposable
{
private readonly FifoMonitor _parent;
public FifoCriticalSection(FifoMonitor parent)
{
_parent = parent;
_parent.Enter();
}
public void Dispose()
{
_parent.Exit();
}
}
private object _innerLock = new object();
private volatile int counter = 0;
private volatile int current = 1;
public FifoCriticalSection Lock()
{
return new FifoCriticalSection(this);
}
private void Enter()
{
int mine = Interlocked.Increment(ref counter);
Monitor.Enter(_innerLock);
while (current != mine) Monitor.Wait(_innerLock);
}
private void Exit()
{
Interlocked.Increment(ref current);
Monitor.PulseAll(_innerLock);
Monitor.Exit(_innerLock);
}
}
现在开始测试。这是我的程序:
public class Program
{
public static void Main()
{
//Setup
var blockingCollection = new QueuedBlockingCollection<int>(10);
var tasks = new Task[10];
//Block the collection by filling it up
for (int i=1; i<=10; i++) blockingCollection.Add(99);
//Start 10 threads all trying to add another value
for (int i=1; i<=10; i++)
{
int index = i; //unclose
tasks[index-1] = Task.Run( () => blockingCollection.Enqueue(index) );
Task.Delay(100).Wait(); //Wait long enough for the Enqueue call to block
}
//Purge the collection, making room for more values
while (blockingCollection.Count > 0)
{
var n = blockingCollection.Take();
Console.WriteLine(n);
}
//Wait for our pending adds to complete
Task.WaitAll(tasks);
//Display the collection in the order read
while (blockingCollection.Count > 0)
{
var n = blockingCollection.Take();
Console.WriteLine(n);
}
}
}
输出:
99
99
99
99
99
99
99
99
99
99
1
2
3
4
5
6
7
8
9
10
看起来有效!但为了确定,我将 Enqueue
改回 Add
,以确保解决方案确实有所作为。果然,它最终与常规 Add
.
99
99
99
99
99
99
99
99
99
99
2
3
4
6
1
5
7
8
9
10
查看 DotNetFiddle
上的代码编辑后确定 - 不要将结果添加到 BlockingCollection 中,而是将任务添加到阻塞集合中。这具有按顺序处理项目的功能,并且具有最大并行度,这将防止启动太多线程并耗尽所有内存。
https://dotnetfiddle.net/lUbSqB
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
public class Program
{
private static BlockingCollection<Task<int>> BlockingCollection {get;set;}
public static void Producer(int numTasks)
{
Random r = new Random(7);
for(int i = 0 ; i < numTasks ; i++)
{
int closured = i;
Task<int> task = new Task<int>(()=>
{
Thread.Sleep(r.Next(100));
Console.WriteLine("Produced: " + closured);
return closured;
});
BlockingCollection.Add(task);
task.Start();
}
BlockingCollection.CompleteAdding();
}
public static void Main()
{
int numTasks = 20;
int maxParallelism = 3;
BlockingCollection = new BlockingCollection<Task<int>>(maxParallelism);
Task.Factory.StartNew(()=> Producer(numTasks));
foreach(var task in BlockingCollection.GetConsumingEnumerable())
{
task.Wait();
Console.WriteLine(" Consumed: "+ task.Result);
task.Dispose();
}
}
}
结果:
Produced: 0
Consumed: 0
Produced: 1
Consumed: 1
Produced: 3
Produced: 2
Consumed: 2
Consumed: 3
Produced: 4
Consumed: 4
Produced: 6
Produced: 5
Consumed: 5
Consumed: 6
Produced: 7
Consumed: 7
Produced: 8
Consumed: 8
Produced: 10
Produced: 9
Consumed: 9
Consumed: 10
Produced: 12
Produced: 13
Produced: 11
Consumed: 11
Consumed: 12
Consumed: 13
Produced: 15
Produced: 14
Consumed: 14
Consumed: 15
Produced: 17
Produced: 16
Produced: 18
Consumed: 16
Consumed: 17
Consumed: 18
Produced: 19
Consumed: 19