信号量的多线程问题
Multithreading issue with semaphore
我需要一段代码,该代码允许基于参数键同时仅由 1 个线程执行:
private static readonly ConcurrentDictionary<string, SemaphoreSlim> Semaphores = new();
private async Task<TModel> GetValueWithBlockAsync<TModel>(string valueKey, Func<Task<TModel>> valueAction)
{
var semaphore = Semaphores.GetOrAdd(valueKey, s => new SemaphoreSlim(1, 1));
try
{
await semaphore.WaitAsync();
return await valueAction();
}
finally
{
semaphore.Release(); // Exception here - System.ObjectDisposedException
if (semaphore.CurrentCount > 0 && Semaphores.TryRemove(valueKey, out semaphore))
{
semaphore?.Dispose();
}
}
}
我有时会收到错误消息:
The semaphore has been disposed. : System.ObjectDisposedException: The semaphore has been disposed.
at System.Threading.SemaphoreSlim.CheckDispose()
at System.Threading.SemaphoreSlim.Release(Int32 releaseCount)
at Project.GetValueWithBlockAsync[TModel](String valueKey, Func`1 valueAction)
这里我能想到的所有情况都是线程安全的。请帮忙,我错过了什么案例?
这里有一个线程竞争,另一个任务正在尝试获取相同的信号量,并在您 Release
时获取它 - 即另一个线程正在等待 semaphore.WaitAsync()
。针对 CurrentCount
的检查是一个竞争条件,它可能会根据时间选择任何一种方式。 TryRemove
的检查无关紧要,因为竞争线程 已经 发出了信号量 - 毕竟,它正在等待 WaitAsync()
.
正如评论中所讨论的,这里有几个竞争条件。
- 线程 1 持有锁,线程 2 正在等待
WaitAsync()
。线程 1 释放锁,然后在线程 2 能够获取它之前检查 semaphore.CurrentCount
。
- 线程 1 持有锁,释放它,并检查
semaphore.CurrentCount
哪个通过了。线程 2 输入 GetValueWithBlockAsync
,调用 Semaphores.GetOrAdd
并获取信号量。线程 1 然后调用 Semaphores.TryRemove
并处理信号量。
您确实需要锁定从 Semaphores
中删除条目的决定——没有办法解决这个问题。您也没有办法跟踪是否有任何线程从 Semaphores
获取了信号量(并且当前正在等待它,或者还没有到达那个点)。
一种方法是做这样的事情:拥有一个在每个人之间共享的锁,但只有在 fetching/creating 信号量时才需要,并决定是否处置它。我们手动跟踪当前有多少线程对特定信号量感兴趣。当线程释放信号量时,它会获取共享锁以检查当前是否有其他人对该信号量感兴趣,只有在没有其他人感兴趣时才释放它。
private static readonly object semaphoresLock = new();
private static readonly Dictionary<string, State> semaphores = new();
private async Task<TModel> GetValueWithBlockAsync<TModel>(string valueKey, Func<Task<TModel>> valueAction)
{
State state;
lock (semaphoresLock)
{
if (!semaphores.TryGetValue(valueKey, out state))
{
state = new();
semaphores[valueKey] = state;
}
state.Count++;
}
try
{
await state.Semaphore.WaitAsync();
return await valueAction();
}
finally
{
state.Semaphore.Release();
lock (semaphoresLock)
{
state.Count--;
if (state.Count == 0)
{
semaphores.Remove(valueKey);
state.Semaphore.Dispose();
}
}
}
}
private class State
{
public int Count { get; set; }
public SemaphoreSlim Semaphore { get; } = new(1, 1);
}
当然,另一种选择是让 Semaphores
成长。也许你有一个周期性的操作来清除任何没有被使用的东西,但是这当然需要保护以确保线程不会突然对正在被清除的信号量感兴趣。
我需要一段代码,该代码允许基于参数键同时仅由 1 个线程执行:
private static readonly ConcurrentDictionary<string, SemaphoreSlim> Semaphores = new();
private async Task<TModel> GetValueWithBlockAsync<TModel>(string valueKey, Func<Task<TModel>> valueAction)
{
var semaphore = Semaphores.GetOrAdd(valueKey, s => new SemaphoreSlim(1, 1));
try
{
await semaphore.WaitAsync();
return await valueAction();
}
finally
{
semaphore.Release(); // Exception here - System.ObjectDisposedException
if (semaphore.CurrentCount > 0 && Semaphores.TryRemove(valueKey, out semaphore))
{
semaphore?.Dispose();
}
}
}
我有时会收到错误消息:
The semaphore has been disposed. : System.ObjectDisposedException: The semaphore has been disposed.
at System.Threading.SemaphoreSlim.CheckDispose()
at System.Threading.SemaphoreSlim.Release(Int32 releaseCount)
at Project.GetValueWithBlockAsync[TModel](String valueKey, Func`1 valueAction)
这里我能想到的所有情况都是线程安全的。请帮忙,我错过了什么案例?
这里有一个线程竞争,另一个任务正在尝试获取相同的信号量,并在您 Release
时获取它 - 即另一个线程正在等待 semaphore.WaitAsync()
。针对 CurrentCount
的检查是一个竞争条件,它可能会根据时间选择任何一种方式。 TryRemove
的检查无关紧要,因为竞争线程 已经 发出了信号量 - 毕竟,它正在等待 WaitAsync()
.
正如评论中所讨论的,这里有几个竞争条件。
- 线程 1 持有锁,线程 2 正在等待
WaitAsync()
。线程 1 释放锁,然后在线程 2 能够获取它之前检查semaphore.CurrentCount
。 - 线程 1 持有锁,释放它,并检查
semaphore.CurrentCount
哪个通过了。线程 2 输入GetValueWithBlockAsync
,调用Semaphores.GetOrAdd
并获取信号量。线程 1 然后调用Semaphores.TryRemove
并处理信号量。
您确实需要锁定从 Semaphores
中删除条目的决定——没有办法解决这个问题。您也没有办法跟踪是否有任何线程从 Semaphores
获取了信号量(并且当前正在等待它,或者还没有到达那个点)。
一种方法是做这样的事情:拥有一个在每个人之间共享的锁,但只有在 fetching/creating 信号量时才需要,并决定是否处置它。我们手动跟踪当前有多少线程对特定信号量感兴趣。当线程释放信号量时,它会获取共享锁以检查当前是否有其他人对该信号量感兴趣,只有在没有其他人感兴趣时才释放它。
private static readonly object semaphoresLock = new();
private static readonly Dictionary<string, State> semaphores = new();
private async Task<TModel> GetValueWithBlockAsync<TModel>(string valueKey, Func<Task<TModel>> valueAction)
{
State state;
lock (semaphoresLock)
{
if (!semaphores.TryGetValue(valueKey, out state))
{
state = new();
semaphores[valueKey] = state;
}
state.Count++;
}
try
{
await state.Semaphore.WaitAsync();
return await valueAction();
}
finally
{
state.Semaphore.Release();
lock (semaphoresLock)
{
state.Count--;
if (state.Count == 0)
{
semaphores.Remove(valueKey);
state.Semaphore.Dispose();
}
}
}
}
private class State
{
public int Count { get; set; }
public SemaphoreSlim Semaphore { get; } = new(1, 1);
}
当然,另一种选择是让 Semaphores
成长。也许你有一个周期性的操作来清除任何没有被使用的东西,但是这当然需要保护以确保线程不会突然对正在被清除的信号量感兴趣。