基于key的异步锁

Asynchronous locking based on a key

我正在尝试解决我的 ImageProcessor 库 here 中出现的问题,在该问题中,我在将项目添加到缓存时出现间歇性文件访问错误。

System.IO.IOException: The process cannot access the file 'D:\home\site\wwwroot\app_data\cache[=12=]\f5f27fc2c8e843443d210a1e84d1ea28bbab6c4.webp' because it is being used by another process.

我写了一个 class 旨在根据散列 url 生成的密钥执行异步锁定,但似乎我在实现中遗漏了一些东西。

我的锁定class

public sealed class AsyncDuplicateLock
{
    /// <summary>
    /// The collection of semaphore slims.
    /// </summary>
    private static readonly ConcurrentDictionary<object, SemaphoreSlim> SemaphoreSlims
                            = new ConcurrentDictionary<object, SemaphoreSlim>();

    /// <summary>
    /// Locks against the given key.
    /// </summary>
    /// <param name="key">
    /// The key that identifies the current object.
    /// </param>
    /// <returns>
    /// The disposable <see cref="Task"/>.
    /// </returns>
    public IDisposable Lock(object key)
    {
        DisposableScope releaser = new DisposableScope(
        key,
        s =>
        {
            SemaphoreSlim locker;
            if (SemaphoreSlims.TryRemove(s, out locker))
            {
                locker.Release();
                locker.Dispose();
            }
        });

        SemaphoreSlim semaphore = SemaphoreSlims.GetOrAdd(key, new SemaphoreSlim(1, 1));
        semaphore.Wait();
        return releaser;
    }

    /// <summary>
    /// Asynchronously locks against the given key.
    /// </summary>
    /// <param name="key">
    /// The key that identifies the current object.
    /// </param>
    /// <returns>
    /// The disposable <see cref="Task"/>.
    /// </returns>
    public Task<IDisposable> LockAsync(object key)
    {
        DisposableScope releaser = new DisposableScope(
        key,
        s =>
        {
            SemaphoreSlim locker;
            if (SemaphoreSlims.TryRemove(s, out locker))
            {
                locker.Release();
                locker.Dispose();
            }
        });

        Task<IDisposable> releaserTask = Task.FromResult(releaser as IDisposable);
        SemaphoreSlim semaphore = SemaphoreSlims.GetOrAdd(key, new SemaphoreSlim(1, 1));

        Task waitTask = semaphore.WaitAsync();

        return waitTask.IsCompleted
                   ? releaserTask
                   : waitTask.ContinueWith(
                       (_, r) => (IDisposable)r,
                       releaser,
                       CancellationToken.None,
                       TaskContinuationOptions.ExecuteSynchronously,
                       TaskScheduler.Default);
    }

    /// <summary>
    /// The disposable scope.
    /// </summary>
    private sealed class DisposableScope : IDisposable
    {
        /// <summary>
        /// The key
        /// </summary>
        private readonly object key;

        /// <summary>
        /// The close scope action.
        /// </summary>
        private readonly Action<object> closeScopeAction;

        /// <summary>
        /// Initializes a new instance of the <see cref="DisposableScope"/> class.
        /// </summary>
        /// <param name="key">
        /// The key.
        /// </param>
        /// <param name="closeScopeAction">
        /// The close scope action.
        /// </param>
        public DisposableScope(object key, Action<object> closeScopeAction)
        {
            this.key = key;
            this.closeScopeAction = closeScopeAction;
        }

        /// <summary>
        /// Disposes the scope.
        /// </summary>
        public void Dispose()
        {
            this.closeScopeAction(this.key);
        }
    }
}

用法 - 在 HttpModule 中

private readonly AsyncDuplicateLock locker = new AsyncDuplicateLock();

using (await this.locker.LockAsync(cachedPath))
{
    // Process and save a cached image.
}

谁能指出我哪里出错了?我担心我误解了一些基本的东西。

库的完整源代码存储在 Github here

对于给定的密钥,

  1. 线程 1 调用 GetOrAdd 并添加一个新的信号量并通过 Wait
  2. 获取它
  3. 线程 2 调用 GetOrAdd 并获取 Wait
  4. 上的现有信号量和块
  5. 线程 1 释放信号量,仅在调用 TryRemove 之后,它从字典中删除了信号量
  6. 线程 2 现在获取信号量。
  7. 线程 3 为与线程 1 和 2 相同的键调用 GetOrAdd。线程 2 仍然持有信号量,但信号量不在字典中,因此线程 3 创建一个新的信号量并且两个线程2 和 3 访问相同的受保护资源。

你需要调整一下逻辑。信号量只有在没有服务员时才应从字典中删除。

这是一个潜在的解决方案,去掉了异步部分:

public sealed class AsyncDuplicateLock
{
    private class LockInfo
    {
        private SemaphoreSlim sem;
        private int waiterCount;

        public LockInfo()
        {
            sem = null;
            waiterCount = 1;
        }

        // Lazily create the semaphore
        private SemaphoreSlim Semaphore
        {
            get
            {
                var s = sem;
                if (s == null)
                {
                    s = new SemaphoreSlim(0, 1);
                    var original = Interlocked.CompareExchange(ref sem, null, s);
                    // If someone else already created a semaphore, return that one
                    if (original != null)
                        return original;
                }
                return s;
            }
        }

        // Returns true if successful
        public bool Enter()
        {
            if (Interlocked.Increment(ref waiterCount) > 1)
            {
                Semaphore.Wait();
                return true;
            }
            return false;
        }

        // Returns true if this lock info is now ready for removal
        public bool Exit()
        {
            if (Interlocked.Decrement(ref waiterCount) <= 0)
                return true;

            // There was another waiter
            Semaphore.Release();
            return false;
        }
    }

    private static readonly ConcurrentDictionary<object, LockInfo> activeLocks = new ConcurrentDictionary<object, LockInfo>();

    public static IDisposable Lock(object key)
    {
        // Get the current info or create a new one
        var info = activeLocks.AddOrUpdate(key,
          (k) => new LockInfo(),
          (k, v) => v.Enter() ? v : new LockInfo());

        DisposableScope releaser = new DisposableScope(() =>
        {
            if (info.Exit())
            {
                // Only remove this exact info, in case another thread has
                // already put its own info into the dictionary
                ((ICollection<KeyValuePair<object, LockInfo>>)activeLocks)
                  .Remove(new KeyValuePair<object, LockInfo>(key, info));
            }
        });

        return releaser;
    }

    private sealed class DisposableScope : IDisposable
    {
        private readonly Action closeScopeAction;

        public DisposableScope(Action closeScopeAction)
        {
            this.closeScopeAction = closeScopeAction;
        }

        public void Dispose()
        {
            this.closeScopeAction();
        }
    }
}

作为 ,原始代码在释放信号量之前从 ConcurrentDictionary 中删除 SemaphoreSlim。所以,你有太多的信号量流失 - 它们在它们仍然可以使用时被从字典中删除(不是获取,但已经从字典中检索)。

这种 "mapping lock" 的问题是很难知道何时不再需要信号量。一种选择是根本不处理信号量;这是简单的解决方案,但在您的场景中可能不可接受。另一种选择——如果信号量实际上与对象实例相关而不是值(如字符串)——是使用ephemerons附加它们;但是,我相信这个选项在您的场景中也是不可接受的。

所以,我们以艰难的方式做到了。 :)

有几种不同的方法可行。我认为从引用计数的角度来处理它是有意义的(引用计数字典中的每个信号量)。另外,我们想让递减计数和删除操作成为原子操作,所以我只使用一个 lock (使并发字典变得多余):

public sealed class AsyncDuplicateLock
{
  private sealed class RefCounted<T>
  {
    public RefCounted(T value)
    {
      RefCount = 1;
      Value = value;
    }

    public int RefCount { get; set; }
    public T Value { get; private set; }
  }

  private static readonly Dictionary<object, RefCounted<SemaphoreSlim>> SemaphoreSlims
                        = new Dictionary<object, RefCounted<SemaphoreSlim>>();

  private SemaphoreSlim GetOrCreate(object key)
  {
    RefCounted<SemaphoreSlim> item;
    lock (SemaphoreSlims)
    {
      if (SemaphoreSlims.TryGetValue(key, out item))
      {
        ++item.RefCount;
      }
      else
      {
        item = new RefCounted<SemaphoreSlim>(new SemaphoreSlim(1, 1));
        SemaphoreSlims[key] = item;
      }
    }
    return item.Value;
  }

  public IDisposable Lock(object key)
  {
    GetOrCreate(key).Wait();
    return new Releaser { Key = key };
  }

  public async Task<IDisposable> LockAsync(object key)
  {
    await GetOrCreate(key).WaitAsync().ConfigureAwait(false);
    return new Releaser { Key = key };
  }

  private sealed class Releaser : IDisposable
  {
    public object Key { get; set; }

    public void Dispose()
    {
      RefCounted<SemaphoreSlim> item;
      lock (SemaphoreSlims)
      {
        item = SemaphoreSlims[Key];
        --item.RefCount;
        if (item.RefCount == 0)
          SemaphoreSlims.Remove(Key);
      }
      item.Value.Release();
    }
  }
}

我用这个重写了@StephenCleary 的回答:

public sealed class AsyncLockList {

    readonly Dictionary<object, SemaphoreReferenceCount> Semaphores = new Dictionary<object, SemaphoreReferenceCount>();

    SemaphoreSlim GetOrCreateSemaphore(object key) {
        lock (Semaphores) {
            if (Semaphores.TryGetValue(key, out var item)) {
                item.IncrementCount();
            } else {
                item = new SemaphoreReferenceCount();
                Semaphores[key] = item;
            }
            return item.Semaphore;
        }
    }

    public IDisposable Lock(object key) {
        GetOrCreateSemaphore(key).Wait();
        return new Releaser(Semaphores, key);
    }

    public async Task<IDisposable> LockAsync(object key) {
        await GetOrCreateSemaphore(key).WaitAsync().ConfigureAwait(false);
        return new Releaser(Semaphores, key);
    }

    sealed class SemaphoreReferenceCount {
        public readonly SemaphoreSlim Semaphore = new SemaphoreSlim(1, 1);
        public int Count { get; private set; } = 1;

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void IncrementCount() => Count++;

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void DecrementCount() => Count--;
    }

    sealed class Releaser : IDisposable {
        readonly Dictionary<object, SemaphoreReferenceCount> Semaphores;
        readonly object Key;

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public Releaser(Dictionary<object, SemaphoreReferenceCount> semaphores, object key) {
            Semaphores = semaphores;
            Key = key;
        }

        public void Dispose() {
            lock (Semaphores) {
                var item = Semaphores[Key];
                item.DecrementCount();
                if (item.Count == 0)
                    Semaphores.Remove(Key);
                item.Semaphore.Release();
            }
        }
    }
}

这是一个 KeyedLock class ,它不太方便,更容易出错,但也比 Stephen Cleary 的 . It maintains internally a pool of SemaphoreSlim 分配更少,可以在它们之后被任何键重用由前一个键释放。池的容量是可配置的,默认为10。

这个 class 不是无分配的,因为 SemaphoreSlim class 每次由于争用而无法同步获取信号量时都会分配内存(实际上很多)。

锁可以同步请求也可以异步请求,也可以带取消和超时请求。这些功能是通过利用 SemaphoreSlim class.

的现有功能提供的
public class KeyedLock<TKey>
{
    private readonly Dictionary<TKey, (SemaphoreSlim, int)> _perKey;
    private readonly Stack<SemaphoreSlim> _pool;
    private readonly int _poolCapacity;

    public KeyedLock(IEqualityComparer<TKey> keyComparer = null, int poolCapacity = 10)
    {
        _perKey = new Dictionary<TKey, (SemaphoreSlim, int)>(keyComparer);
        _pool = new Stack<SemaphoreSlim>(poolCapacity);
        _poolCapacity = poolCapacity;
    }

    public async Task<bool> WaitAsync(TKey key, int millisecondsTimeout,
        CancellationToken cancellationToken = default)
    {
        var semaphore = GetSemaphore(key);
        bool entered = false;
        try
        {
            entered = await semaphore.WaitAsync(millisecondsTimeout,
                cancellationToken).ConfigureAwait(false);
        }
        finally { if (!entered) ReleaseSemaphore(key, entered: false); }
        return entered;
    }

    public Task WaitAsync(TKey key, CancellationToken cancellationToken = default)
        => WaitAsync(key, Timeout.Infinite, cancellationToken);

    public bool Wait(TKey key, int millisecondsTimeout,
        CancellationToken cancellationToken = default)
    {
        var semaphore = GetSemaphore(key);
        bool entered = false;
        try { entered = semaphore.Wait(millisecondsTimeout, cancellationToken); }
        finally { if (!entered) ReleaseSemaphore(key, entered: false); }
        return entered;
    }

    public void Wait(TKey key, CancellationToken cancellationToken = default)
        => Wait(key, Timeout.Infinite, cancellationToken);

    public void Release(TKey key) => ReleaseSemaphore(key, entered: true);

    private SemaphoreSlim GetSemaphore(TKey key)
    {
        SemaphoreSlim semaphore;
        lock (_perKey)
        {
            if (_perKey.TryGetValue(key, out var entry))
            {
                int counter;
                (semaphore, counter) = entry;
                _perKey[key] = (semaphore, ++counter);
            }
            else
            {
                lock (_pool) semaphore = _pool.Count > 0 ? _pool.Pop() : null;
                if (semaphore == null) semaphore = new SemaphoreSlim(1, 1);
                _perKey[key] = (semaphore, 1);
            }
        }
        return semaphore;
    }

    private void ReleaseSemaphore(TKey key, bool entered)
    {
        SemaphoreSlim semaphore; int counter;
        lock (_perKey)
        {
            if (_perKey.TryGetValue(key, out var entry))
            {
                (semaphore, counter) = entry;
                counter--;
                if (counter == 0)
                    _perKey.Remove(key);
                else
                    _perKey[key] = (semaphore, counter);
            }
            else
            {
                throw new InvalidOperationException("Key not found.");
            }
        }
        if (entered) semaphore.Release();
        if (counter == 0)
        {
            Debug.Assert(semaphore.CurrentCount == 1);
            lock (_pool) if (_pool.Count < _poolCapacity) _pool.Push(semaphore);
        }
    }
}

用法示例:

var locker = new KeyedLock<string>();

await locker.WaitAsync("Hello");
try
{
    await DoSomethingAsync();
}
finally
{
    locker.Release("Hello");
}

实现使用 tuple deconstruction,至少需要 C# 7。

KeyedLock class 可以很容易地修改为 KeyedSemaphore,这将允许每个键有多个并发操作。它只需要构造函数中的 maximumConcurrencyPerKey 参数,该参数将被存储并传递给 SemaphoreSlim 的构造函数。


注意: SemaphoreSlim class 被误用时会抛出 SemaphoreFullException。当释放信号量的次数多于获取信号量时,就会发生这种情况。这个答案的 KeyedLock 实现在误用的情况下表现不同:它抛出一个 InvalidOperationException("Key not found.")。发生这种情况是因为当一个键被释放的次数与它被获取的次数一样多时,相关的信号量就会从字典中删除。如果此实现抛出 SemaphoreFullException,则表示存在错误。

受到的启发,这里有一个支持异步等待的版本:

    public class KeyedLock<TKey>
    {
        private readonly ConcurrentDictionary<TKey, LockInfo> _locks = new();

        public int Count => _locks.Count;

        public async Task<IDisposable> WaitAsync(TKey key, CancellationToken cancellationToken = default)
        {
            // Get the current info or create a new one.
            var info = _locks.AddOrUpdate(key,
                // Add
                k => new LockInfo(),
                // Update
                (k, v) => v.Enter() ? v : new LockInfo());

            try
            {
                await info.Semaphore.WaitAsync(cancellationToken);

                return new Releaser(() => Release(key, info, true));
            }
            catch (OperationCanceledException)
            {
                // The semaphore wait was cancelled, release the lock.
                Release(key, info, false);
                throw;
            }
        }

        private void Release(TKey key, LockInfo info, bool isCurrentlyLocked)
        {
            if (info.Leave())
            {
                // This was the last lock for the key.

                // Only remove this exact info, in case another thread has
                // already put its own info into the dictionary
                // Note that this call to Remove(entry) is in fact thread safe.
                var entry = new KeyValuePair<TKey, LockInfo>(key, info);
                if (((ICollection<KeyValuePair<TKey, LockInfo>>)_locks).Remove(entry))
                {
                    // This exact info was removed.
                    info.Dispose();
                }
            }
            else if (isCurrentlyLocked)
            {
                // There is another waiter.
                info.Semaphore.Release();
            }
        }

        private class LockInfo : IDisposable
        {
            private SemaphoreSlim _semaphore = null;
            private int _refCount = 1;

            public SemaphoreSlim Semaphore
            {
                get
                {
                    // Lazily create the semaphore.
                    var s = _semaphore;
                    if (s is null)
                    {
                        s = new SemaphoreSlim(1, 1);

                        // Assign _semaphore if its current value is null.
                        var original = Interlocked.CompareExchange(ref _semaphore, s, null);

                        // If someone else already created a semaphore, return that one
                        if (original is not null)
                        {
                            s.Dispose();
                            return original;
                        }
                    }
                    return s;
                }
            }

            // Returns true if successful
            public bool Enter()
            {
                if (Interlocked.Increment(ref _refCount) > 1)
                {
                    return true;
                }

                // This lock info is not valid anymore - its semaphore is or will be disposed.
                return false;
            }

            // Returns true if this lock info is now ready for removal
            public bool Leave()
            {
                if (Interlocked.Decrement(ref _refCount) <= 0)
                {
                    // This was the last lock
                    return true;
                }

                // There is another waiter
                return false;
            }

            public void Dispose() => _semaphore?.Dispose();
        }

        private sealed class Releaser : IDisposable
        {
            private readonly Action _dispose;

            public Releaser(Action dispose) => _dispose = dispose;

            public void Dispose() => _dispose();
        }
    }