资源池的正确实现方式

Correct way to implement a resource pool

我正在尝试实现一些管理资源池的东西,这样调用代码就可以请求一个对象,如果可用,就会从池中获得一个对象,否则就会等待。但是,我无法使同步正常工作。我池中的 class 是这样的(其中 autoEvent 是一个 AutoResetEvent 最初设置为信号:

public Foo GetFooFromPool()
{
    autoEvent.WaitOne();
    var foo = Pool.FirstOrDefault(p => !p.InUse);
    if (foo != null)
    {
        foo.InUse = true;
        autoEvent.Set();
        return foo;
    }
    else if (Pool.Count < Capacity)
    {
        System.Diagnostics.Debug.WriteLine("count {0}\t capacity {1}", Pool.Count, Capacity);
        foo = new Foo() { InUse = true };
        Pool.Add(foo);
        autoEvent.Set();
        return foo;
    }
    else
    {
        return GetFooFromPool();
    }
}

public void ReleaseFoo(Foo p)
{
    p.InUse = false;
    autoEvent.Set();
}

这个想法是当你调用 GetFooFromPool 时,你等到有信号,然后你尝试找到一个现有的 Foo 未被使用。如果找到一个,我们将其设置为 InUse,然后发出信号以便其他线程可以继续。如果找不到,我们会检查池是否已满。如果没有,我们创建一个新的 Foo,将其添加到池中并再次发出信号。如果这两个条件都不满足,我们将通过再次调用 GetFooFromPool 再次等待。

现在在 ReleaseFoo 中,我们只是将 InUse 设置回 false,并向在 GetFooFromPool 中等待的下一个线程发出信号(如果有)尝试获取 Foo .

问题似乎出在我管理池的大小上。容量为 5,我以 6 Foo 结束。我可以在我的调试行中看到 count 0 出现了几次并且 count 1 也可能出现了几次。很明显我有多个线程进入块,据我所知,他们不应该能够。

我做错了什么?

编辑:像这样的双重检查锁:

else if (Pool.Count < Capacity)
{
    lock(locker)
    {
        if (Pool.Count < Capacity)
        {
            System.Diagnostics.Debug.WriteLine("count {0}\t capacity {1}", Pool.Count, Capacity);
            foo = new Foo() { InUse = true };
            Pool.Add(foo);
            autoEvent.Set();
            return foo;
        }
    }
} 

似乎确实解决了问题,但我不确定这是最优雅的方法。

您的操作存在一些问题,但您的特定竞争条件可能是由如下情况引起的。假设你有一个容量。

1) 池中有一个未使用的项目。

2) 线程 #1 获取它并发出事件信号。

3) 线程 #2 找不到可用事件并进入容量块。 它还没有添加项目。

4) 线程 #1 returns 将项目放入池中并向事件发出信号。

5) 使用另外两个线程(例如#3、#4)重复步骤 1、2 和 3。

6) 线程 #2 向池中添加一个项目。

7) 线程 #4 向池中添加一个项目。

容量为 1 的池中现在有两个项目。

但是,您的实施还有其他潜在问题。

  • 根据您的 Pool.Count 和 Add() 的同步方式,您可能看不到最新值。
  • 您可能有多个线程获取 同一个未使用的项目
  • 使用 AutoResetEvent 控制访问会使自己面临难以发现的问题(例如这个问题),因为您正在尝试使用无锁解决方案,而不是仅仅获取锁并使用 Monitor.Wait() 和 Monitor.Pulse() 为此目的。

正如评论中已经提到的,计数信号量是您的朋友。 将它与并发堆栈结合起来,您将获得一个非常简单、线程安全的实现,您仍然可以在其中延迟分配您的池项目。

下面的基本实现提供了这种方法的示例。请注意,这里的另一个优点是您不需要 "contaminate" 带有 InUse 成员的池项目作为标记来跟踪东西。

请注意,作为微优化,在这种情况下,堆栈优于队列,因为它将提供池中最近返回的实例,该实例可能仍在例如一级缓存。

public class GenericConcurrentPool<T> : IDisposable where T : class
{
    private readonly SemaphoreSlim _sem;
    private readonly ConcurrentStack<T> _itemsStack;
    private readonly Action<T> _onDisposeItem;
    private readonly Func<T> _factory;

    public GenericConcurrentPool(int capacity, Func<T> factory, Action<T> onDisposeItem = null)
    {
        _itemsStack = new ConcurrentStack<T>(new T[capacity]);
        _factory = factory;
        _onDisposeItem = onDisposeItem;
        _sem = new SemaphoreSlim(capacity);
    }

    public async Task<T> CheckOutAsync()
    {
        await _sem.WaitAsync();
        return Pop();
    }

    public T CheckOut()
    {
        _sem.Wait();
        return Pop();
    }

    public void CheckIn(T item)
    {
        Push(item);
        _sem.Release();
    }

    public void Dispose()
    {
        _sem.Dispose();
        if (_onDisposeItem != null)
        {
            T item;
            while (_itemsStack.TryPop(out item))
            {
                if (item != null)
                    _onDisposeItem(item);
            }
        }
    }

    private T Pop()
    {
        T item;
        var result = _itemsStack.TryPop(out item);
        Debug.Assert(result);
        return item ?? _factory();
    }

    private void Push(T item)
    {
        Debug.Assert(item != null);
        _itemsStack.Push(item);
    }
}