SemaphoreSlim 等待优先级

SemaphoreSlim Await Priority

我想知道 SemaphoreSlim 在调用 Await 时是否有类似优先级的东西。

我还没有找到任何东西,但也许有人以前做过这样的事情。

我的想法是,如果我需要,稍后可以在信号量上以更高的优先级调用 await,它会允许 await 先 return。

不,SemaphoreSlim 中没有优先级,无论您使用的是同步锁定还是异步锁定。

很少需要异步锁的优先级。通常这类问题如果退一步看大局,会有更优雅的解决方案。

这里有一个classPrioritySemaphore<TPriority>可以优先获得。在内部它基于 SortedSet 集合。

public class PrioritySemaphore<TPriority>
{
    private readonly PriorityQueue _priorityQueue;
    private readonly object _locker = new object();
    private readonly int _maxCount;
    private int _currentCount;
    private long _indexSeed = 0;

    public PrioritySemaphore(int initialCount, int maxCount,
        IComparer<TPriority> comparer = null)
    {
        if (initialCount < 0)
            throw new ArgumentOutOfRangeException(nameof(initialCount));
        if (maxCount <= 0) throw new ArgumentOutOfRangeException(nameof(maxCount));

        _priorityQueue = new PriorityQueue(comparer);
        _currentCount = initialCount;
        _maxCount = maxCount;
    }
    public PrioritySemaphore(int initialCount, IComparer<TPriority> comparer = null)
        : this(initialCount, Int32.MaxValue, comparer) { }
    public PrioritySemaphore(IComparer<TPriority> comparer = null)
        : this(0, Int32.MaxValue, comparer) { }

    public int CurrentCount { get { lock (_locker) return _currentCount; } }

    public async Task<bool> WaitAsync(TPriority priority, int millisecondsTimeout,
        CancellationToken cancellationToken = default)
    {
        if (millisecondsTimeout < -1)
            throw new ArgumentOutOfRangeException(nameof(millisecondsTimeout));

        cancellationToken.ThrowIfCancellationRequested();
        lock (_locker)
        {
            if (_currentCount > 0)
            {
                _currentCount--;
                return true;
            }
        }
        if (millisecondsTimeout == 0) return false;
        var tcs = new TaskCompletionSource<bool>(
            TaskCreationOptions.RunContinuationsAsynchronously);
        long entryIndex = -1;
        bool taskCompleted = false;

        Timer timer = null;
        if (millisecondsTimeout > 0)
        {
            timer = new Timer(_ =>
            {
                bool doComplete;
                lock (_locker)
                {
                    doComplete = entryIndex == -1
                        || _priorityQueue.Remove(priority, entryIndex);
                    if (doComplete) taskCompleted = true;
                }
                if (doComplete) tcs.TrySetResult(false);
            }, null, millisecondsTimeout, Timeout.Infinite);
        }

        CancellationTokenRegistration registration = default;
        if (cancellationToken.CanBeCanceled)
        {
            registration = cancellationToken.Register(() =>
            {
                bool doComplete;
                lock (_locker)
                {
                    doComplete = entryIndex == -1
                        || _priorityQueue.Remove(priority, entryIndex);
                    if (doComplete) taskCompleted = true;
                }
                if (doComplete) tcs.TrySetCanceled(cancellationToken);
            });
        }

        bool disposeSubscriptions = false;
        lock (_locker)
        {
            if (!taskCompleted)
            {
                entryIndex = _indexSeed++;
                _priorityQueue.Enqueue(priority, entryIndex, tcs, timer, registration);
            }
            else
            {
                disposeSubscriptions = true;
            }
        }
        if (disposeSubscriptions)
        {
            timer?.Dispose();
            registration.Dispose();
        }
        return await tcs.Task.ConfigureAwait(false);
    }

    public Task WaitAsync(TPriority priority,
        CancellationToken cancellationToken = default)
    {
        return WaitAsync(priority, Timeout.Infinite, cancellationToken);
    }

    public void Release()
    {
        TaskCompletionSource<bool> tcs;
        Timer timer;
        CancellationTokenRegistration registration;
        lock (_locker)
        {
            if (_priorityQueue.IsEmpty)
            {
                if (_currentCount >= _maxCount) throw new SemaphoreFullException();
                _currentCount++;
                return;
            }
            (tcs, timer, registration) = _priorityQueue.Dequeue();
        }
        tcs.TrySetResult(true);
        timer?.Dispose();
        registration.Dispose();
    }

    private class PriorityQueue : IComparer<(TPriority Priority, long Index,
        TaskCompletionSource<bool>, Timer, CancellationTokenRegistration)>
    {
        private readonly SortedSet<(TPriority Priority, long Index,
            TaskCompletionSource<bool> TCS, Timer Timer,
            CancellationTokenRegistration Registration)> _sortedSet;

        private readonly IComparer<TPriority> _priorityComparer;
        private readonly Comparer<long> _indexComparer = Comparer<long>.Default;

        public PriorityQueue(IComparer<TPriority> comparer)
        {
            _priorityComparer = comparer ?? Comparer<TPriority>.Default;
            _sortedSet = new SortedSet<(TPriority Priority, long Index,
            TaskCompletionSource<bool> TCS, Timer Timer,
            CancellationTokenRegistration Registration)>(this);
        }

        public bool IsEmpty => _sortedSet.Count == 0;

        public void Enqueue(TPriority priority, long index,
            TaskCompletionSource<bool> tcs, Timer timer,
            CancellationTokenRegistration registration)
        {
            _sortedSet.Add((priority, index, tcs, timer, registration));
        }

        public (TaskCompletionSource<bool>, Timer, CancellationTokenRegistration)
            Dequeue()
        {
            Debug.Assert(_sortedSet.Count > 0);
            var entry = _sortedSet.Min;
            _sortedSet.Remove(entry);
            return (entry.TCS, entry.Timer, entry.Registration);
        }

        public bool Remove(TPriority priority, long index)
        {
            return _sortedSet.Remove((priority, index, default, default, default));
        }

        public int Compare((TPriority Priority, long Index,
            TaskCompletionSource<bool>, Timer, CancellationTokenRegistration) x,
            (TPriority Priority, long Index, TaskCompletionSource<bool>, Timer,
            CancellationTokenRegistration) y)
        {
            int result = _priorityComparer.Compare(x.Priority, y.Priority);
            if (result == 0) result = _indexComparer.Compare(x.Index, y.Index);
            return result;
        }
    }
}

用法示例:

var semaphore = new PrioritySemaphore<int>();
//...
await semaphore.WaitAsync(priority: 1);
//...
await semaphore.WaitAsync(priority: 2);
//...
semaphore.Release();

Release之后,信号量会被优先级最高的waiter获取。在上面的示例中,它将是具有优先级 1 的等待者。较小的值表示较高的优先级。如果有多个具有相同最高优先级的等待者,信号量将由第一个请求它的等待者获取(保持 FIFO 顺序)。

classPrioritySemaphore<TPriority>只有异步API。它支持超时等待和 CancellationToken,但这些功能尚未经过广泛测试。


注:.NET 6引入了PriorityQueue<TElement, TPriority>class,理论上可以用来简化上述实现。不幸的是,新的 class 不支持从队列中删除特定元素。仅支持出队。为了实现 PrioritySemaphore<TPriority> class 的取消和超时功能,需要从队列中删除特定元素。所以新的class不能用在上面的实现中