无法将项目排队到抽象中使用的 BlockingCollection class
Cannot enqueue items to BlockingCollection used in abstract class
如标题所示,我利用抽象 class 创建可重用基础 class 以使用 BlockingCollection 添加和删除项目。我使用生产者/消费者模式来 enqueue 和 dequeue 项目,抽象 class 是通用的,所以我可以为 queue 指定一个 object。
当我调用基础 classes Enqueue 和 Dequeue 方法时,我的问题出现了,它们似乎引用了 queue 的不同实例,即,当我调用 dequeue 时,queue 上没有项目,而 enqueues TryAdd() returns 为真。但是,当我直接在 queue 实例上调用 Add 或 Take 时,它工作正常。
谁能告诉我为什么基础 classes Enqueue 和 Dequeue 方法不能像我预期的那样工作,我已经尝试查找 BlockingCollections 的几种用途但不能看看为什么。
Program.cs
static void Main(string[] args)
{
var processor = new Processor(1,4);
Console.ReadKey();
}
抽象基础class
public abstract class BaseProcessor<T>: IDisposable
{
protected BlockingCollection<T> _queue;
private CancellationTokenSource _tokenSource;
private int _producers;
private int _consumers;
private List<Task> _tasks;
public BaseProcessor(int producers, int consumers)
{
_queue = new BlockingCollection<T>();
_tokenSource = new CancellationTokenSource();
_producers = producers;
_consumers = consumers;
_tasks = new List<Task>();
}
protected void Setup()
{
Parallel.For(0, _producers, i =>
_tasks.Add(Task.Factory.StartNew(() => Produce(_tokenSource.Token), _tokenSource.Token)
.ContinueWith((task) =>
{
Console.WriteLine("Task {0} stopped", task.Id);
}))
);
Parallel.For(0, _producers, i =>
_tasks.Add(Task.Factory.StartNew(() => Consume(_tokenSource.Token), _tokenSource.Token)
.ContinueWith((task) =>
{
Console.WriteLine("Task {0} stopped", task.Id);
}))
);
}
protected abstract void Produce(CancellationToken token);
protected abstract void Consume(CancellationToken token);
protected void Enqueue(T item)
{
try
{
var res = _queue.TryAdd(item, TimeSpan.FromSeconds(1));
}
catch (Exception ex)
{
Console.WriteLine("Could not add item to queue: {0}", ex);
}
}
protected void Enqueue(List<T> items)
{
items.ForEach(o => Enqueue(o));
}
protected T Dequeue()
{
try
{
T item;
while (_queue.TryTake(out item, TimeSpan.FromSeconds(1))) ;
return item;
}
catch (Exception ex)
{
Console.WriteLine("Could not remove item to queue: {0}", ex);
return default(T);
}
}
public void Dispose()
{
// Cancel all tokens
_tokenSource.Cancel();
// Wait for all tasks complete
Task.WaitAll(_tasks.ToArray());
_queue.Dispose();
}
}
摘要class实施
public class Processor: BaseProcessor<QueueItem>
{
private int _counter;
public Processor(int producers, int consumers): base(producers, consumers)
{
_counter = 0;
Setup();
}
protected override void Produce(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
var queueItem = new QueueItem()
{
Id = _counter,
Timestamp = DateTime.Now
};
//Enqueue(queueItem);
_queue.Add(queueItem);
Console.WriteLine("Enqueued: {0}", _counter);
_counter++;
Thread.Sleep(3000);
}
}
protected override void Consume(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
var item = Dequeue();
//var item = _queue.Take();
Console.WriteLine("Dequeued: {0}", item.Id);
}
}
}
我发现了原因,它是我从我正在使用的 link 中获取的一些代码的实现。 https://www.infoworld.com/article/3090215/how-to-work-with-blockingcollection-in-c.html
while (_queue.TryTake(out item, TimeSpan.FromSeconds(1)));正在转储队列中的所有项目,它是 运行 直到队列实际上是空的。从教程中我推测它是 运行 虽然有一些东西可以尝试和采取。
代我盲目复制粘贴问题
如标题所示,我利用抽象 class 创建可重用基础 class 以使用 BlockingCollection 添加和删除项目。我使用生产者/消费者模式来 enqueue 和 dequeue 项目,抽象 class 是通用的,所以我可以为 queue 指定一个 object。
当我调用基础 classes Enqueue 和 Dequeue 方法时,我的问题出现了,它们似乎引用了 queue 的不同实例,即,当我调用 dequeue 时,queue 上没有项目,而 enqueues TryAdd() returns 为真。但是,当我直接在 queue 实例上调用 Add 或 Take 时,它工作正常。
谁能告诉我为什么基础 classes Enqueue 和 Dequeue 方法不能像我预期的那样工作,我已经尝试查找 BlockingCollections 的几种用途但不能看看为什么。
Program.cs
static void Main(string[] args)
{
var processor = new Processor(1,4);
Console.ReadKey();
}
抽象基础class
public abstract class BaseProcessor<T>: IDisposable
{
protected BlockingCollection<T> _queue;
private CancellationTokenSource _tokenSource;
private int _producers;
private int _consumers;
private List<Task> _tasks;
public BaseProcessor(int producers, int consumers)
{
_queue = new BlockingCollection<T>();
_tokenSource = new CancellationTokenSource();
_producers = producers;
_consumers = consumers;
_tasks = new List<Task>();
}
protected void Setup()
{
Parallel.For(0, _producers, i =>
_tasks.Add(Task.Factory.StartNew(() => Produce(_tokenSource.Token), _tokenSource.Token)
.ContinueWith((task) =>
{
Console.WriteLine("Task {0} stopped", task.Id);
}))
);
Parallel.For(0, _producers, i =>
_tasks.Add(Task.Factory.StartNew(() => Consume(_tokenSource.Token), _tokenSource.Token)
.ContinueWith((task) =>
{
Console.WriteLine("Task {0} stopped", task.Id);
}))
);
}
protected abstract void Produce(CancellationToken token);
protected abstract void Consume(CancellationToken token);
protected void Enqueue(T item)
{
try
{
var res = _queue.TryAdd(item, TimeSpan.FromSeconds(1));
}
catch (Exception ex)
{
Console.WriteLine("Could not add item to queue: {0}", ex);
}
}
protected void Enqueue(List<T> items)
{
items.ForEach(o => Enqueue(o));
}
protected T Dequeue()
{
try
{
T item;
while (_queue.TryTake(out item, TimeSpan.FromSeconds(1))) ;
return item;
}
catch (Exception ex)
{
Console.WriteLine("Could not remove item to queue: {0}", ex);
return default(T);
}
}
public void Dispose()
{
// Cancel all tokens
_tokenSource.Cancel();
// Wait for all tasks complete
Task.WaitAll(_tasks.ToArray());
_queue.Dispose();
}
}
摘要class实施
public class Processor: BaseProcessor<QueueItem>
{
private int _counter;
public Processor(int producers, int consumers): base(producers, consumers)
{
_counter = 0;
Setup();
}
protected override void Produce(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
var queueItem = new QueueItem()
{
Id = _counter,
Timestamp = DateTime.Now
};
//Enqueue(queueItem);
_queue.Add(queueItem);
Console.WriteLine("Enqueued: {0}", _counter);
_counter++;
Thread.Sleep(3000);
}
}
protected override void Consume(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
var item = Dequeue();
//var item = _queue.Take();
Console.WriteLine("Dequeued: {0}", item.Id);
}
}
}
我发现了原因,它是我从我正在使用的 link 中获取的一些代码的实现。 https://www.infoworld.com/article/3090215/how-to-work-with-blockingcollection-in-c.html
while (_queue.TryTake(out item, TimeSpan.FromSeconds(1)));正在转储队列中的所有项目,它是 运行 直到队列实际上是空的。从教程中我推测它是 运行 虽然有一些东西可以尝试和采取。
代我盲目复制粘贴问题