如何仅在线程必须处理数据集的非重叠行时才锁定线程
How to lock threads only if they have to work on non overlapping rows of the dataset
我有那个在多线程中执行的方法。
该方法必须读取一些行,其中有两个 DateTime 字段,开始和停止。
该方法必须对数据集本身进行一些修改,并且必须对其他数据集进行其他操作。
目前我正在使用锁来锁定该方法的执行,以防止该方法从不同线程处理数据集。该解决方案有效但不是最佳的,理想情况下我只想在尝试修改一组共享行时锁定。当我尝试处理同一个数据集时,我想允许执行,但在数据集的不同时间范围内,这些数据集不会与执行一次重叠。
如果问题是“按行”锁定,我想使用并发字典,在其中使用行 ID 进行锁定,类似这样:
var key = myConcurrentDictionary.GetOrAdd(rowNr, new object());
lock (key)
{
DoWork(rowNr);
}
但就我而言,我正在处理一系列行,但我不知道如何才能实现我的 objective。
这是一个自定义 RangeLock
实现,它允许在通用边界之间的范围内获取独占锁:
/// <summary>
/// A mechanism that enforces exclusive access to resources associated with ranges.
/// </summary>
public class RangeLock<TBoundary>
{
// Represents a lock that is currently acquired or pending.
private class Entry
{
public TBoundary Start { get; init; }
public TBoundary End { get; init; }
public TaskCompletionSource<object> TCS; // It's null when acquired.
public int Countdown; // Number of older entries that are overlapping this.
public List<Entry> Overlapped; // Newer entries that are overlapped by this.
}
private readonly IComparer<TBoundary> _comparer;
private readonly List<Entry> _entries = new();
public RangeLock(IComparer<TBoundary> comparer = default)
{
_comparer = comparer ?? Comparer<TBoundary>.Default;
}
/// <summary>Acquires an exclusive lock on the specified range.</summary>
/// <param name="start">The inclusive lower bound of the range.</param>
/// <param name="end">The exclusive upper bound of the range.</param>
/// <returns>A token that identifies the acquired exclusive lock.</returns>
public Task<object> WaitAsync(TBoundary start, TBoundary end)
{
if (_comparer.Compare(end, start) < 0)
throw new ArgumentOutOfRangeException(nameof(end));
var entry = new Entry() { Start = start, End = end };
lock (_entries)
{
foreach (var older in _entries)
{
if (Overlaps(entry, older))
{
(older.Overlapped ??= new()).Add(entry);
entry.Countdown++;
}
}
_entries.Add(entry);
if (entry.Countdown == 0) return Task.FromResult((object)entry);
entry.TCS = new TaskCompletionSource<object>(
TaskCreationOptions.RunContinuationsAsynchronously);
return entry.TCS.Task;
}
}
public object Wait(TBoundary start, TBoundary end)
=> WaitAsync(start, end).GetAwaiter().GetResult();
/// <summary>Releases a previously acquired exclusive lock.</summary>
/// <param name="token">A token that identifies the exclusive lock.</param>
public void Release(object token)
{
if (token == null) throw new ArgumentNullException(nameof(token));
if (token is not Entry entry) throw new ArgumentException("Invalid token.");
lock (_entries)
{
if (!_entries.Remove(entry))
throw new ArgumentException("Unknown token.");
if (entry.Overlapped == null) return;
foreach (var overlapped in entry.Overlapped)
{
overlapped.Countdown--;
if (overlapped.Countdown == 0)
{
Debug.Assert(overlapped.TCS != null);
overlapped.TCS.SetResult(overlapped);
overlapped.TCS = null;
}
}
}
}
private bool Overlaps(Entry entry1, Entry entry2)
{
return (
_comparer.Compare(entry1.Start, entry2.Start) <= 0 &&
_comparer.Compare(entry1.End, entry2.Start) > 0
) || (
_comparer.Compare(entry2.Start, entry1.Start) <= 0 &&
_comparer.Compare(entry2.End, entry1.Start) > 0
);
}
}
用法示例:
var locker = new RangeLock<DateTime>();
DateTime start = new DateTime(2022, 3, 1); // Inclusive
DateTime end = new DateTime(2022, 4, 1); // Exclusive
// This range represents the whole month March, 2022.
var token = await locker.WaitAsync(start, end);
try
{
// Here do something with the range between start and end.
// Other execution flows that want to acquire a lock on an overlapping
// range, will have to wait until this execution flow releases the lock.
}
finally { locker.Release(token); }
这个实现既有同步的也有异步的API。它适用于在低争用情况下同步中等数量的 threads/asynchronous 工作流,以及在持有锁的情况下进行大量工作。它 不 适合同步大量正在争夺拥挤 range-space 的流,并且正在做细粒度(轻量级)的工作。在这种情况下,简单的 SemaphoreSlim(1, 1)
或 lock
可能比 RangeLock
具有更好的性能。 WaitAsync
和 Release
方法的复杂度都是 O(n)。 不支持CancellationToken
或超时。添加此功能并非易事。
可以找到验证上述实现正确性的演示应用程序here。
可以在此答案的 first revision 中找到基于 Wait
/Pulse
机制(仅提供同步 API)的替代实现。
可以在此答案的 4th revision 中找到具有 int
边界的非通用实现。它具有 O(n²) 复杂度。当执行流的数量较少时,它可能会更有效。
我有那个在多线程中执行的方法。 该方法必须读取一些行,其中有两个 DateTime 字段,开始和停止。 该方法必须对数据集本身进行一些修改,并且必须对其他数据集进行其他操作。 目前我正在使用锁来锁定该方法的执行,以防止该方法从不同线程处理数据集。该解决方案有效但不是最佳的,理想情况下我只想在尝试修改一组共享行时锁定。当我尝试处理同一个数据集时,我想允许执行,但在数据集的不同时间范围内,这些数据集不会与执行一次重叠。
如果问题是“按行”锁定,我想使用并发字典,在其中使用行 ID 进行锁定,类似这样:
var key = myConcurrentDictionary.GetOrAdd(rowNr, new object());
lock (key)
{
DoWork(rowNr);
}
但就我而言,我正在处理一系列行,但我不知道如何才能实现我的 objective。
这是一个自定义 RangeLock
实现,它允许在通用边界之间的范围内获取独占锁:
/// <summary>
/// A mechanism that enforces exclusive access to resources associated with ranges.
/// </summary>
public class RangeLock<TBoundary>
{
// Represents a lock that is currently acquired or pending.
private class Entry
{
public TBoundary Start { get; init; }
public TBoundary End { get; init; }
public TaskCompletionSource<object> TCS; // It's null when acquired.
public int Countdown; // Number of older entries that are overlapping this.
public List<Entry> Overlapped; // Newer entries that are overlapped by this.
}
private readonly IComparer<TBoundary> _comparer;
private readonly List<Entry> _entries = new();
public RangeLock(IComparer<TBoundary> comparer = default)
{
_comparer = comparer ?? Comparer<TBoundary>.Default;
}
/// <summary>Acquires an exclusive lock on the specified range.</summary>
/// <param name="start">The inclusive lower bound of the range.</param>
/// <param name="end">The exclusive upper bound of the range.</param>
/// <returns>A token that identifies the acquired exclusive lock.</returns>
public Task<object> WaitAsync(TBoundary start, TBoundary end)
{
if (_comparer.Compare(end, start) < 0)
throw new ArgumentOutOfRangeException(nameof(end));
var entry = new Entry() { Start = start, End = end };
lock (_entries)
{
foreach (var older in _entries)
{
if (Overlaps(entry, older))
{
(older.Overlapped ??= new()).Add(entry);
entry.Countdown++;
}
}
_entries.Add(entry);
if (entry.Countdown == 0) return Task.FromResult((object)entry);
entry.TCS = new TaskCompletionSource<object>(
TaskCreationOptions.RunContinuationsAsynchronously);
return entry.TCS.Task;
}
}
public object Wait(TBoundary start, TBoundary end)
=> WaitAsync(start, end).GetAwaiter().GetResult();
/// <summary>Releases a previously acquired exclusive lock.</summary>
/// <param name="token">A token that identifies the exclusive lock.</param>
public void Release(object token)
{
if (token == null) throw new ArgumentNullException(nameof(token));
if (token is not Entry entry) throw new ArgumentException("Invalid token.");
lock (_entries)
{
if (!_entries.Remove(entry))
throw new ArgumentException("Unknown token.");
if (entry.Overlapped == null) return;
foreach (var overlapped in entry.Overlapped)
{
overlapped.Countdown--;
if (overlapped.Countdown == 0)
{
Debug.Assert(overlapped.TCS != null);
overlapped.TCS.SetResult(overlapped);
overlapped.TCS = null;
}
}
}
}
private bool Overlaps(Entry entry1, Entry entry2)
{
return (
_comparer.Compare(entry1.Start, entry2.Start) <= 0 &&
_comparer.Compare(entry1.End, entry2.Start) > 0
) || (
_comparer.Compare(entry2.Start, entry1.Start) <= 0 &&
_comparer.Compare(entry2.End, entry1.Start) > 0
);
}
}
用法示例:
var locker = new RangeLock<DateTime>();
DateTime start = new DateTime(2022, 3, 1); // Inclusive
DateTime end = new DateTime(2022, 4, 1); // Exclusive
// This range represents the whole month March, 2022.
var token = await locker.WaitAsync(start, end);
try
{
// Here do something with the range between start and end.
// Other execution flows that want to acquire a lock on an overlapping
// range, will have to wait until this execution flow releases the lock.
}
finally { locker.Release(token); }
这个实现既有同步的也有异步的API。它适用于在低争用情况下同步中等数量的 threads/asynchronous 工作流,以及在持有锁的情况下进行大量工作。它 不 适合同步大量正在争夺拥挤 range-space 的流,并且正在做细粒度(轻量级)的工作。在这种情况下,简单的 SemaphoreSlim(1, 1)
或 lock
可能比 RangeLock
具有更好的性能。 WaitAsync
和 Release
方法的复杂度都是 O(n)。 不支持CancellationToken
或超时。添加此功能并非易事。
可以找到验证上述实现正确性的演示应用程序here。
可以在此答案的 first revision 中找到基于 Wait
/Pulse
机制(仅提供同步 API)的替代实现。
可以在此答案的 4th revision 中找到具有 int
边界的非通用实现。它具有 O(n²) 复杂度。当执行流的数量较少时,它可能会更有效。