信号量的多线程问题

Multithreading issue with semaphore

我需要一段代码,该代码允许基于参数键同时仅由 1 个线程执行:

    private static readonly ConcurrentDictionary<string, SemaphoreSlim> Semaphores = new();

    private async Task<TModel> GetValueWithBlockAsync<TModel>(string valueKey, Func<Task<TModel>> valueAction)
    {
        var semaphore = Semaphores.GetOrAdd(valueKey, s => new SemaphoreSlim(1, 1));

        try
        {
            await semaphore.WaitAsync();

            return await valueAction();
        }
        finally
        {
            semaphore.Release(); // Exception here - System.ObjectDisposedException
            if (semaphore.CurrentCount > 0 && Semaphores.TryRemove(valueKey, out semaphore))
            {
                semaphore?.Dispose();
            }
        }
    }

我有时会收到错误消息:

The semaphore has been disposed. : System.ObjectDisposedException: The semaphore has been disposed.
   at System.Threading.SemaphoreSlim.CheckDispose()
   at System.Threading.SemaphoreSlim.Release(Int32 releaseCount)
   at Project.GetValueWithBlockAsync[TModel](String valueKey, Func`1 valueAction)

这里我能想到的所有情况都是线程安全的。请帮忙,我错过了什么案例?

这里有一个线程竞争,另一个任务正在尝试获取相同的信号量,并在您 Release 时获取它 - 即另一个线程正在等待 semaphore.WaitAsync()。针对 CurrentCount 的检查是一个竞争条件,它可能会根据时间选择任何一种方式。 TryRemove 的检查无关紧要,因为竞争线程 已经 发出了信号量 - 毕竟,它正在等待 WaitAsync().

正如评论中所讨论的,这里有几个竞争条件。

  1. 线程 1 持有锁,线程 2 正在等待 WaitAsync()。线程 1 释放锁,然后在线程 2 能够获取它之前检查 semaphore.CurrentCount
  2. 线程 1 持有锁,释放它,并检查 semaphore.CurrentCount 哪个通过了。线程 2 输入 GetValueWithBlockAsync,调用 Semaphores.GetOrAdd 并获取信号量。线程 1 然后调用 Semaphores.TryRemove 并处理信号量。

您确实需要锁定从 Semaphores 中删除条目的决定——没有办法解决这个问题。您也没有办法跟踪是否有任何线程从 Semaphores 获取了信号量(并且当前正在等待它,或者还没有到达那个点)。

一种方法是做这样的事情:拥有一个在每个人之间共享的锁,但只有在 fetching/creating 信号量时才需要,并决定是否处置它。我们手动跟踪当前有多少线程对特定信号量感兴趣。当线程释放信号量时,它会获取共享锁以检查当前是否有其他人对该信号量感兴趣,只有在没有其他人感兴趣时才释放它。

private static readonly object semaphoresLock = new();
private static readonly Dictionary<string, State> semaphores = new();

private async Task<TModel> GetValueWithBlockAsync<TModel>(string valueKey, Func<Task<TModel>> valueAction)
{
    State state;
    lock (semaphoresLock)
    {
        if (!semaphores.TryGetValue(valueKey, out state))
        {
            state = new();
            semaphores[valueKey] = state;
        }
        
        state.Count++;
    }

    try
    {
        await state.Semaphore.WaitAsync();

        return await valueAction();
    }
    finally
    {
        state.Semaphore.Release();
        lock (semaphoresLock)
        {
            state.Count--;
            if (state.Count == 0)
            {
                semaphores.Remove(valueKey);
                state.Semaphore.Dispose();
            }
        }
    }
}

private class State
{
    public int Count { get; set; }
    public SemaphoreSlim Semaphore { get; } = new(1, 1);
}

当然,另一种选择是让 Semaphores 成长。也许你有一个周期性的操作来清除任何没有被使用的东西,但是这当然需要保护以确保线程不会突然对正在被清除的信号量感兴趣。