如何仅在线程必须处理数据集的非重叠行时才锁定线程

How to lock threads only if they have to work on non overlapping rows of the dataset

我有那个在多线程中执行的方法。 该方法必须读取一些行,其中有两个 DateTime 字段,开始和停止。 该方法必须对数据集本身进行一些修改,并且必须对其他数据集进行其他操作。 目前我正在使用锁来锁定该方法的执行,以防止该方法从不同线程处理数据集。该解决方案有效但不是最佳的,理想情况下我只想在尝试修改一组共享行时锁定。当我尝试处理同一个数据集时,我想允许执行,但在数据集的不同时间范围内,这些数据集不会与执行一次重叠。

如果问题是“按行”锁定,我想使用并发字典,在其中使用行 ID 进行锁定,类似这样:

var key = myConcurrentDictionary.GetOrAdd(rowNr, new object());
lock (key)
{
    DoWork(rowNr);
}

但就我而言,我正在处理一系列行,但我不知道如何才能实现我的 objective。

这是一个自定义 RangeLock 实现,它允许在通用边界之间的范围内获取独占锁:

/// <summary>
/// A mechanism that enforces exclusive access to resources associated with ranges.
/// </summary>
public class RangeLock<TBoundary>
{
    // Represents a lock that is currently acquired or pending.
    private class Entry
    {
        public TBoundary Start { get; init; }
        public TBoundary End { get; init; }
        public TaskCompletionSource<object> TCS; // It's null when acquired.
        public int Countdown; // Number of older entries that are overlapping this.
        public List<Entry> Overlapped; // Newer entries that are overlapped by this.
    }

    private readonly IComparer<TBoundary> _comparer;
    private readonly List<Entry> _entries = new();

    public RangeLock(IComparer<TBoundary> comparer = default)
    {
        _comparer = comparer ?? Comparer<TBoundary>.Default;
    }

    /// <summary>Acquires an exclusive lock on the specified range.</summary>
    /// <param name="start">The inclusive lower bound of the range.</param>
    /// <param name="end">The exclusive upper bound of the range.</param>
    /// <returns>A token that identifies the acquired exclusive lock.</returns>
    public Task<object> WaitAsync(TBoundary start, TBoundary end)
    {
        if (_comparer.Compare(end, start) < 0)
            throw new ArgumentOutOfRangeException(nameof(end));
        var entry = new Entry() { Start = start, End = end };
        lock (_entries)
        {
            foreach (var older in _entries)
            {
                if (Overlaps(entry, older))
                {
                    (older.Overlapped ??= new()).Add(entry);
                    entry.Countdown++;
                }
            }
            _entries.Add(entry);
            if (entry.Countdown == 0) return Task.FromResult((object)entry);
            entry.TCS = new TaskCompletionSource<object>(
                TaskCreationOptions.RunContinuationsAsynchronously);
            return entry.TCS.Task;
        }
    }

    public object Wait(TBoundary start, TBoundary end)
        => WaitAsync(start, end).GetAwaiter().GetResult();

    /// <summary>Releases a previously acquired exclusive lock.</summary>
    /// <param name="token">A token that identifies the exclusive lock.</param>
    public void Release(object token)
    {
        if (token == null) throw new ArgumentNullException(nameof(token));
        if (token is not Entry entry) throw new ArgumentException("Invalid token.");
        lock (_entries)
        {
            if (!_entries.Remove(entry))
                throw new ArgumentException("Unknown token.");
            if (entry.Overlapped == null) return;
            foreach (var overlapped in entry.Overlapped)
            {
                overlapped.Countdown--;
                if (overlapped.Countdown == 0)
                {
                    Debug.Assert(overlapped.TCS != null);
                    overlapped.TCS.SetResult(overlapped);
                    overlapped.TCS = null;
                }
            }
        }
    }

    private bool Overlaps(Entry entry1, Entry entry2)
    {
        return (
            _comparer.Compare(entry1.Start, entry2.Start) <= 0 &&
            _comparer.Compare(entry1.End, entry2.Start) > 0
        ) || (
            _comparer.Compare(entry2.Start, entry1.Start) <= 0 &&
            _comparer.Compare(entry2.End, entry1.Start) > 0
        );
    }
}

用法示例:

var locker = new RangeLock<DateTime>();

DateTime start = new DateTime(2022, 3, 1); // Inclusive
DateTime end = new DateTime(2022, 4, 1); // Exclusive
// This range represents the whole month March, 2022.

var token = await locker.WaitAsync(start, end);
try
{
    // Here do something with the range between start and end.
    // Other execution flows that want to acquire a lock on an overlapping
    // range, will have to wait until this execution flow releases the lock.
}
finally { locker.Release(token); }

这个实现既有同步的也有异步的API。它适用于在低争用情况下同步中等数量的 threads/asynchronous 工作流,以及在持有锁的情况下进行大量工作。它 适合同步大量正在争夺拥挤 range-space 的流,并且正在做细粒度(轻量级)的工作。在这种情况下,简单的 SemaphoreSlim(1, 1)lock 可能比 RangeLock 具有更好的性能。 WaitAsyncRelease 方法的复杂度都是 O(n)。 支持CancellationToken或超时。添加此功能并非易事。

可以找到验证上述实现正确性的演示应用程序here

可以在此答案的 first revision 中找到基于 Wait/Pulse 机制(仅提供同步 API)的替代实现。

可以在此答案的 4th revision 中找到具有 int 边界的非通用实现。它具有 O(n²) 复杂度。当执行流的数量较少时,它可能会更有效。