超时和特定条件下的消费者

Consumer with timeout and under specific condition

BlockingCollection<T> class 提供了一种简单的方法来实现 producer/consumer 模式,但遗憾的是没有我需要的功能。它允许我在等待使用元素时设置超时,但不提供限制从集合中删除哪个项目的方法。

我如何实现类似于 BlockingCollection<T> 的 class,但它允许我指定应该获取项目的条件?

例如:我只需要取 Bar 项且 Amount 等于特定值:

public class Bar 
{
    public Int32 Amount { get; set; }
}

public class Program 
{
    public  static void Main()
    {
        ToDoCollection<Bar> ToDoCollection = new ToDoCollection<Bar>();
        int timeout = 10000;

        // this doesn't work, that's why I'm asking for your help
        Bar value = ToDoCollection.TryTake().Where(p => p.Amount != 5);

        // Here, I need to wait for 10s trying to take item from blockingCollection
        // item, that will follow specific condition: Bar.Amount has to be greater then zero
    }
}

如果我没理解错的话,您想要一个具有以下行为的集合:

  1. 允许线程尝试检索与特定条件匹配的项目,并会阻塞线程直到该项目出现。
  2. 允许线程为第 1 点中描述的操作指定超时。

现有的 BlockingCollection class 显然与问题毫无关系。

您可以实现自己的集合类型,添加您需要的任何特定功能。例如:

class BlockingPredicateCollection<T>
{
    private readonly object _lock = new object();
    private readonly List<T> _list = new List<T>();

    public void Add(T t)
    {
        lock (_lock)
        {
            _list.Add(t);

            // Wake any waiting threads, so they can check if the element they
            // are waiting for is now present.
            Monitor.PulseAll(_lock);
        }
    }

    public bool TryTake(out T t, Predicate<T> predicate, TimeSpan timeout)
    {
        Stopwatch sw = Stopwatch.StartNew();

        lock (_lock)
        {
            int index;

            while ((index = _list.FindIndex(predicate)) < 0)
            {
                TimeSpan elapsed = sw.Elapsed;

                if (elapsed > timeout ||
                    !Monitor.Wait(_lock, timeout - elapsed))
                {
                    t = default(T);
                    return false;
                }
            }

            t = _list[index];
            _list.RemoveAt(index);
            return true;
        }
    }
}

然后,例如:

BlockingPredicateCollection<Bar> toDoCollection = new BlockingPredicateCollection<Bar>();
int timeout = 10000;
Bar value;

if (toDoCollection.TryTake(out value,
    p => p.Amount != 5, TimeSpan.FromMilliseconds(timeout)))
{
    // do something with "value"
}