超时和特定条件下的消费者
Consumer with timeout and under specific condition
BlockingCollection<T>
class 提供了一种简单的方法来实现 producer/consumer 模式,但遗憾的是没有我需要的功能。它允许我在等待使用元素时设置超时,但不提供限制从集合中删除哪个项目的方法。
我如何实现类似于 BlockingCollection<T>
的 class,但它允许我指定应该获取项目的条件?
例如:我只需要取 Bar
项且 Amount
等于特定值:
public class Bar
{
public Int32 Amount { get; set; }
}
public class Program
{
public static void Main()
{
ToDoCollection<Bar> ToDoCollection = new ToDoCollection<Bar>();
int timeout = 10000;
// this doesn't work, that's why I'm asking for your help
Bar value = ToDoCollection.TryTake().Where(p => p.Amount != 5);
// Here, I need to wait for 10s trying to take item from blockingCollection
// item, that will follow specific condition: Bar.Amount has to be greater then zero
}
}
如果我没理解错的话,您想要一个具有以下行为的集合:
- 允许线程尝试检索与特定条件匹配的项目,并会阻塞线程直到该项目出现。
- 允许线程为第 1 点中描述的操作指定超时。
现有的 BlockingCollection
class 显然与问题毫无关系。
您可以实现自己的集合类型,添加您需要的任何特定功能。例如:
class BlockingPredicateCollection<T>
{
private readonly object _lock = new object();
private readonly List<T> _list = new List<T>();
public void Add(T t)
{
lock (_lock)
{
_list.Add(t);
// Wake any waiting threads, so they can check if the element they
// are waiting for is now present.
Monitor.PulseAll(_lock);
}
}
public bool TryTake(out T t, Predicate<T> predicate, TimeSpan timeout)
{
Stopwatch sw = Stopwatch.StartNew();
lock (_lock)
{
int index;
while ((index = _list.FindIndex(predicate)) < 0)
{
TimeSpan elapsed = sw.Elapsed;
if (elapsed > timeout ||
!Monitor.Wait(_lock, timeout - elapsed))
{
t = default(T);
return false;
}
}
t = _list[index];
_list.RemoveAt(index);
return true;
}
}
}
然后,例如:
BlockingPredicateCollection<Bar> toDoCollection = new BlockingPredicateCollection<Bar>();
int timeout = 10000;
Bar value;
if (toDoCollection.TryTake(out value,
p => p.Amount != 5, TimeSpan.FromMilliseconds(timeout)))
{
// do something with "value"
}
BlockingCollection<T>
class 提供了一种简单的方法来实现 producer/consumer 模式,但遗憾的是没有我需要的功能。它允许我在等待使用元素时设置超时,但不提供限制从集合中删除哪个项目的方法。
我如何实现类似于 BlockingCollection<T>
的 class,但它允许我指定应该获取项目的条件?
例如:我只需要取 Bar
项且 Amount
等于特定值:
public class Bar
{
public Int32 Amount { get; set; }
}
public class Program
{
public static void Main()
{
ToDoCollection<Bar> ToDoCollection = new ToDoCollection<Bar>();
int timeout = 10000;
// this doesn't work, that's why I'm asking for your help
Bar value = ToDoCollection.TryTake().Where(p => p.Amount != 5);
// Here, I need to wait for 10s trying to take item from blockingCollection
// item, that will follow specific condition: Bar.Amount has to be greater then zero
}
}
如果我没理解错的话,您想要一个具有以下行为的集合:
- 允许线程尝试检索与特定条件匹配的项目,并会阻塞线程直到该项目出现。
- 允许线程为第 1 点中描述的操作指定超时。
现有的 BlockingCollection
class 显然与问题毫无关系。
您可以实现自己的集合类型,添加您需要的任何特定功能。例如:
class BlockingPredicateCollection<T>
{
private readonly object _lock = new object();
private readonly List<T> _list = new List<T>();
public void Add(T t)
{
lock (_lock)
{
_list.Add(t);
// Wake any waiting threads, so they can check if the element they
// are waiting for is now present.
Monitor.PulseAll(_lock);
}
}
public bool TryTake(out T t, Predicate<T> predicate, TimeSpan timeout)
{
Stopwatch sw = Stopwatch.StartNew();
lock (_lock)
{
int index;
while ((index = _list.FindIndex(predicate)) < 0)
{
TimeSpan elapsed = sw.Elapsed;
if (elapsed > timeout ||
!Monitor.Wait(_lock, timeout - elapsed))
{
t = default(T);
return false;
}
}
t = _list[index];
_list.RemoveAt(index);
return true;
}
}
}
然后,例如:
BlockingPredicateCollection<Bar> toDoCollection = new BlockingPredicateCollection<Bar>();
int timeout = 10000;
Bar value;
if (toDoCollection.TryTake(out value,
p => p.Amount != 5, TimeSpan.FromMilliseconds(timeout)))
{
// do something with "value"
}