资源池的正确实现方式
Correct way to implement a resource pool
我正在尝试实现一些管理资源池的东西,这样调用代码就可以请求一个对象,如果可用,就会从池中获得一个对象,否则就会等待。但是,我无法使同步正常工作。我池中的 class 是这样的(其中 autoEvent
是一个 AutoResetEvent
最初设置为信号:
public Foo GetFooFromPool()
{
autoEvent.WaitOne();
var foo = Pool.FirstOrDefault(p => !p.InUse);
if (foo != null)
{
foo.InUse = true;
autoEvent.Set();
return foo;
}
else if (Pool.Count < Capacity)
{
System.Diagnostics.Debug.WriteLine("count {0}\t capacity {1}", Pool.Count, Capacity);
foo = new Foo() { InUse = true };
Pool.Add(foo);
autoEvent.Set();
return foo;
}
else
{
return GetFooFromPool();
}
}
public void ReleaseFoo(Foo p)
{
p.InUse = false;
autoEvent.Set();
}
这个想法是当你调用 GetFooFromPool
时,你等到有信号,然后你尝试找到一个现有的 Foo
未被使用。如果找到一个,我们将其设置为 InUse
,然后发出信号以便其他线程可以继续。如果找不到,我们会检查池是否已满。如果没有,我们创建一个新的 Foo
,将其添加到池中并再次发出信号。如果这两个条件都不满足,我们将通过再次调用 GetFooFromPool
再次等待。
现在在 ReleaseFoo
中,我们只是将 InUse
设置回 false,并向在 GetFooFromPool
中等待的下一个线程发出信号(如果有)尝试获取 Foo
.
问题似乎出在我管理池的大小上。容量为 5
,我以 6
Foo
结束。我可以在我的调试行中看到 count 0
出现了几次并且 count 1
也可能出现了几次。很明显我有多个线程进入块,据我所知,他们不应该能够。
我做错了什么?
编辑:像这样的双重检查锁:
else if (Pool.Count < Capacity)
{
lock(locker)
{
if (Pool.Count < Capacity)
{
System.Diagnostics.Debug.WriteLine("count {0}\t capacity {1}", Pool.Count, Capacity);
foo = new Foo() { InUse = true };
Pool.Add(foo);
autoEvent.Set();
return foo;
}
}
}
似乎确实解决了问题,但我不确定这是最优雅的方法。
您的操作存在一些问题,但您的特定竞争条件可能是由如下情况引起的。假设你有一个容量。
1) 池中有一个未使用的项目。
2) 线程 #1 获取它并发出事件信号。
3) 线程 #2 找不到可用事件并进入容量块。 它还没有添加项目。
4) 线程 #1 returns 将项目放入池中并向事件发出信号。
5) 使用另外两个线程(例如#3、#4)重复步骤 1、2 和 3。
6) 线程 #2 向池中添加一个项目。
7) 线程 #4 向池中添加一个项目。
容量为 1 的池中现在有两个项目。
但是,您的实施还有其他潜在问题。
- 根据您的 Pool.Count 和 Add() 的同步方式,您可能看不到最新值。
- 您可能有多个线程获取 同一个未使用的项目。
- 使用 AutoResetEvent 控制访问会使自己面临难以发现的问题(例如这个问题),因为您正在尝试使用无锁解决方案,而不是仅仅获取锁并使用 Monitor.Wait() 和 Monitor.Pulse() 为此目的。
正如评论中已经提到的,计数信号量是您的朋友。
将它与并发堆栈结合起来,您将获得一个非常简单、线程安全的实现,您仍然可以在其中延迟分配您的池项目。
下面的基本实现提供了这种方法的示例。请注意,这里的另一个优点是您不需要 "contaminate" 带有 InUse
成员的池项目作为标记来跟踪东西。
请注意,作为微优化,在这种情况下,堆栈优于队列,因为它将提供池中最近返回的实例,该实例可能仍在例如一级缓存。
public class GenericConcurrentPool<T> : IDisposable where T : class
{
private readonly SemaphoreSlim _sem;
private readonly ConcurrentStack<T> _itemsStack;
private readonly Action<T> _onDisposeItem;
private readonly Func<T> _factory;
public GenericConcurrentPool(int capacity, Func<T> factory, Action<T> onDisposeItem = null)
{
_itemsStack = new ConcurrentStack<T>(new T[capacity]);
_factory = factory;
_onDisposeItem = onDisposeItem;
_sem = new SemaphoreSlim(capacity);
}
public async Task<T> CheckOutAsync()
{
await _sem.WaitAsync();
return Pop();
}
public T CheckOut()
{
_sem.Wait();
return Pop();
}
public void CheckIn(T item)
{
Push(item);
_sem.Release();
}
public void Dispose()
{
_sem.Dispose();
if (_onDisposeItem != null)
{
T item;
while (_itemsStack.TryPop(out item))
{
if (item != null)
_onDisposeItem(item);
}
}
}
private T Pop()
{
T item;
var result = _itemsStack.TryPop(out item);
Debug.Assert(result);
return item ?? _factory();
}
private void Push(T item)
{
Debug.Assert(item != null);
_itemsStack.Push(item);
}
}
我正在尝试实现一些管理资源池的东西,这样调用代码就可以请求一个对象,如果可用,就会从池中获得一个对象,否则就会等待。但是,我无法使同步正常工作。我池中的 class 是这样的(其中 autoEvent
是一个 AutoResetEvent
最初设置为信号:
public Foo GetFooFromPool()
{
autoEvent.WaitOne();
var foo = Pool.FirstOrDefault(p => !p.InUse);
if (foo != null)
{
foo.InUse = true;
autoEvent.Set();
return foo;
}
else if (Pool.Count < Capacity)
{
System.Diagnostics.Debug.WriteLine("count {0}\t capacity {1}", Pool.Count, Capacity);
foo = new Foo() { InUse = true };
Pool.Add(foo);
autoEvent.Set();
return foo;
}
else
{
return GetFooFromPool();
}
}
public void ReleaseFoo(Foo p)
{
p.InUse = false;
autoEvent.Set();
}
这个想法是当你调用 GetFooFromPool
时,你等到有信号,然后你尝试找到一个现有的 Foo
未被使用。如果找到一个,我们将其设置为 InUse
,然后发出信号以便其他线程可以继续。如果找不到,我们会检查池是否已满。如果没有,我们创建一个新的 Foo
,将其添加到池中并再次发出信号。如果这两个条件都不满足,我们将通过再次调用 GetFooFromPool
再次等待。
现在在 ReleaseFoo
中,我们只是将 InUse
设置回 false,并向在 GetFooFromPool
中等待的下一个线程发出信号(如果有)尝试获取 Foo
.
问题似乎出在我管理池的大小上。容量为 5
,我以 6
Foo
结束。我可以在我的调试行中看到 count 0
出现了几次并且 count 1
也可能出现了几次。很明显我有多个线程进入块,据我所知,他们不应该能够。
我做错了什么?
编辑:像这样的双重检查锁:
else if (Pool.Count < Capacity)
{
lock(locker)
{
if (Pool.Count < Capacity)
{
System.Diagnostics.Debug.WriteLine("count {0}\t capacity {1}", Pool.Count, Capacity);
foo = new Foo() { InUse = true };
Pool.Add(foo);
autoEvent.Set();
return foo;
}
}
}
似乎确实解决了问题,但我不确定这是最优雅的方法。
您的操作存在一些问题,但您的特定竞争条件可能是由如下情况引起的。假设你有一个容量。
1) 池中有一个未使用的项目。
2) 线程 #1 获取它并发出事件信号。
3) 线程 #2 找不到可用事件并进入容量块。 它还没有添加项目。
4) 线程 #1 returns 将项目放入池中并向事件发出信号。
5) 使用另外两个线程(例如#3、#4)重复步骤 1、2 和 3。
6) 线程 #2 向池中添加一个项目。
7) 线程 #4 向池中添加一个项目。
容量为 1 的池中现在有两个项目。
但是,您的实施还有其他潜在问题。
- 根据您的 Pool.Count 和 Add() 的同步方式,您可能看不到最新值。
- 您可能有多个线程获取 同一个未使用的项目。
- 使用 AutoResetEvent 控制访问会使自己面临难以发现的问题(例如这个问题),因为您正在尝试使用无锁解决方案,而不是仅仅获取锁并使用 Monitor.Wait() 和 Monitor.Pulse() 为此目的。
正如评论中已经提到的,计数信号量是您的朋友。 将它与并发堆栈结合起来,您将获得一个非常简单、线程安全的实现,您仍然可以在其中延迟分配您的池项目。
下面的基本实现提供了这种方法的示例。请注意,这里的另一个优点是您不需要 "contaminate" 带有 InUse
成员的池项目作为标记来跟踪东西。
请注意,作为微优化,在这种情况下,堆栈优于队列,因为它将提供池中最近返回的实例,该实例可能仍在例如一级缓存。
public class GenericConcurrentPool<T> : IDisposable where T : class
{
private readonly SemaphoreSlim _sem;
private readonly ConcurrentStack<T> _itemsStack;
private readonly Action<T> _onDisposeItem;
private readonly Func<T> _factory;
public GenericConcurrentPool(int capacity, Func<T> factory, Action<T> onDisposeItem = null)
{
_itemsStack = new ConcurrentStack<T>(new T[capacity]);
_factory = factory;
_onDisposeItem = onDisposeItem;
_sem = new SemaphoreSlim(capacity);
}
public async Task<T> CheckOutAsync()
{
await _sem.WaitAsync();
return Pop();
}
public T CheckOut()
{
_sem.Wait();
return Pop();
}
public void CheckIn(T item)
{
Push(item);
_sem.Release();
}
public void Dispose()
{
_sem.Dispose();
if (_onDisposeItem != null)
{
T item;
while (_itemsStack.TryPop(out item))
{
if (item != null)
_onDisposeItem(item);
}
}
}
private T Pop()
{
T item;
var result = _itemsStack.TryPop(out item);
Debug.Assert(result);
return item ?? _factory();
}
private void Push(T item)
{
Debug.Assert(item != null);
_itemsStack.Push(item);
}
}