对具有相同属性的命令进行并行处理的最佳方式
Best way to order parallel processing of commands with the same attribute
我有一列命令。其中一些具有相同的属性,例如“文档 ID”。我需要能够并行处理它们,但有一个限制:具有相同功能的命令应按照它们在队列中出现的顺序进行处理。
例如:
我的队列是 [n, a, s, j, a, l, v, g, a, f, f],其中字母是 DocumentId。我需要并行处理,但是 'a' 的处理应该按照它们在队列中出现的顺序进行,即 [1, 4, 8],其中数字是队列中字母的 ID .因此,这些元素的处理顺序无关紧要,只要 [8] 在 [4] 之后即可[1](它们之间有任意数量的中间项)。
首先,我尝试对 DocumentId 进行 SemaphoreSlim 锁定。这意味着,如果我们处理一个项目,如果其他线程应该处理同一个项目,我们就会阻塞它们。
这没有用,因为 SemaphoreSlim 不保证解除阻塞的 FIFO 顺序。
然后,我围绕 SemaphoreSlim 做了一个包装器,以强制执行 FIFO 解锁:
public class FifoAsyncLock : IDisposable
{
private readonly SemaphoreSlim _sem = new (1, 1);
private readonly ConcurrentQueue<TaskCompletionSource> _queue = new ();
public async Task WaitAsync()
{
var tcsE = new TaskCompletionSource();
_queue.Enqueue(tcsE);
await _sem.WaitAsync();
if (_queue.TryDequeue(out var tcsD))
tcsD.SetResult();
await tcsE.Task;
}
public void Release()
{
_sem.Release();
}
public void Dispose()
{
_sem.Dispose();
}
}
我在 class 中使用它,我在其中为每个 DocumentId 存储了一个信号量,并且还记录了有多少锁定用户正在等待解锁。如果最后一个用户释放锁,则删除(因为内存):
public class DocIdLocker : IDisposable
{
private readonly ConcurrentDictionary<Guid, FifoAsyncLock> _docIdLocks = new ();
private readonly ConcurrentDictionary<Guid, int> _users = new ();
private bool _disposed;
public async Task<IAsyncDisposable> AquireLockAsync(Guid docId)
{
var userCount = _users.AddOrUpdate(docId, 1, (_, o) => o + 1);
await _docIdLocks.GetOrAdd(docId, new FifoAsyncLock()).WaitAsync();
return new Lock(this, docId);
}
private async Task Release(Guid docId)
{
if (!_docIdLocks.ContainsKey(docId))
throw new KeyNotFoundException($"Key not found: '{docId}'");
_docIdLocks[docId].Release();
if (!_users.ContainsKey(docId))
throw new KeyNotFoundException($"Key not found: '{docId}'");
if (--_users[docId] == 0)
{
_docIdLocks.TryRemove(docId, out _);
_users.TryRemove(docId, out _);
}
}
private class Lock : IAsyncDisposable
{
private readonly DocIdLocker _parent;
private readonly Guid _docId;
public Lock(DocIdLocker parent, Guid docId)
{
_parent = parent;
_docId = docId;
}
public ValueTask DisposeAsync() => new (_parent.Release(_docId));
}
public void Dispose()
{
if (_disposed)
return;
foreach (var item in _docIdLocks.Values)
item.Dispose();
_users.Clear();
_disposed = true;
}
}
但我的测试仍然显示 'a' 的顺序未保留。
我想,也许有些线程在锁不存在时获取元素,并乱序处理它们。好吧,这一切都变得很难推理,现在我脑子里乱七八糟。
是否有一种简单而优雅的方法来实现我想要实现的目标?
解决这个问题最简单的方法可能是根据命令的相关属性对命令进行分组,然后并行处理这些组而不是单个文档。然后对每一组执行一个顺序的foreach
循环,将相关的文档一个一个处理。示例:
string[] documents = new[] { 'n', 'a', 's', 'j', 'a', 'l', 'v', 'g', 'a', 'f', 'f' }
.Select((item, index) => $"{item}-{index}")
.ToArray();
Console.WriteLine($"Documents: [{String.Join(", ", documents)}]");
var grouped = documents.GroupBy(item => item[0]); // Group by the first char
ParallelOptions options = new()
{
MaxDegreeOfParallelism = Environment.ProcessorCount
};
Parallel.ForEach(grouped, options, grouping =>
{
foreach (var document in grouping)
{
Console.WriteLine($"Processing: {document}");
Thread.Sleep(500); // Simulate a CPU-bound or blocking operation
}
});
输出:
Documents: [n-0, a-1, s-2, j-3, a-4, l-5, v-6, g-7, a-8, f-9, f-10]
Processing: a-1
Processing: n-0
Processing: s-2
Processing: j-3
Processing: a-4
Processing: l-5
Processing: v-6
Processing: g-7
Processing: f-9
Processing: a-8
Processing: f-10
GroupBy
LINQ 运算符的排序行为已明确定义。根据文档:
The IGrouping<TKey,TElement>
objects are yielded in an order based on the order of the elements in source that produced the first key of each IGrouping<TKey,TElement>
. Elements in a grouping are yielded in the order that the elements that produced them appear in source.
这种方法有一些缺点:
- 在开始并行处理之前必须完全枚举源序列。如果源序列是延迟可枚举的,例如
BlockingCollection<T>
包含来自并行生产者的实时项目,这可能是一个问题。
- 处理顺序由第一个唯一键在源序列中出现的顺序决定,而不是由项目本身的顺序决定。因此,例如如果源是 (A, B, A, A, A, A, A) 并且并行度为 1,则 B 项将最后处理。
- 生成的分区方案可能不平衡。如果存在包含大量元素的键,并且这些键在源序列中出现较晚,则并行处理可能会在操作结束时并行化程度降低。为缓解此问题,最好根据组包含的项目数按降序对组重新排序。
下面是自定义 LINQ 运算符 ToConsumableGroupings
,它可能比标准 GroupBy
运算符更适合这种情况。它解决了前面提到的大部分问题,因为它懒惰地枚举源序列,并发出分组 on-the-go。它与 GroupBy
运算符具有相同的签名:
/// <summary>
/// Groups the elements of a sequence into consumable groupings, according to
/// a specified key selector function.
/// </summary>
/// <remarks>
/// For each key, more than one groupings can be emitted. A new grouping can be emitted
/// if the previously emitted grouping for the same key has been fully consumed.
/// </remarks>
public static IEnumerable<IGrouping<TKey, TSource>>
ToConsumableGroupings<TKey, TSource>(
this IEnumerable<TSource> source,
Func<TSource, TKey> keySelector,
IEqualityComparer<TKey> keyComparer = default)
{
var perKey = new Dictionary<TKey, Queue<TSource>>(keyComparer);
foreach (var item in source)
{
var key = keySelector(item);
lock (perKey)
{
if (perKey.TryGetValue(key, out var queue))
{
queue.Enqueue(item); continue;
}
queue = perKey[key] = new Queue<TSource>();
queue.Enqueue(item);
}
yield return new Grouping<TKey, TSource>(key, GetGroup(key));
}
IEnumerable<TSource> GetGroup(TKey key)
{
while (true)
{
TSource item;
lock (perKey)
{
var queue = perKey[key];
if (queue.Count == 0) { perKey.Remove(key); break; }
item = queue.Dequeue();
}
yield return item;
}
}
}
private class Grouping<TKey, TSource> : IGrouping<TKey, TSource>
{
private readonly TKey _key;
private readonly IEnumerable<TSource> _sequence;
public Grouping(TKey key, IEnumerable<TSource> sequence)
{
_key = key;
_sequence = sequence;
}
public TKey Key => _key;
public IEnumerator<TSource> GetEnumerator() => _sequence.GetEnumerator();
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
用法示例:
var grouped = documents.ToConsumableGroupings(item => item[0]);
与 GroupBy
运算符不同,ToConsumableGroupings
运算符发出 non-materialized 组,预计只会被枚举(消耗)一次。
我有一列命令。其中一些具有相同的属性,例如“文档 ID”。我需要能够并行处理它们,但有一个限制:具有相同功能的命令应按照它们在队列中出现的顺序进行处理。
例如: 我的队列是 [n, a, s, j, a, l, v, g, a, f, f],其中字母是 DocumentId。我需要并行处理,但是 'a' 的处理应该按照它们在队列中出现的顺序进行,即 [1, 4, 8],其中数字是队列中字母的 ID .因此,这些元素的处理顺序无关紧要,只要 [8] 在 [4] 之后即可[1](它们之间有任意数量的中间项)。
首先,我尝试对 DocumentId 进行 SemaphoreSlim 锁定。这意味着,如果我们处理一个项目,如果其他线程应该处理同一个项目,我们就会阻塞它们。 这没有用,因为 SemaphoreSlim 不保证解除阻塞的 FIFO 顺序。
然后,我围绕 SemaphoreSlim 做了一个包装器,以强制执行 FIFO 解锁:
public class FifoAsyncLock : IDisposable
{
private readonly SemaphoreSlim _sem = new (1, 1);
private readonly ConcurrentQueue<TaskCompletionSource> _queue = new ();
public async Task WaitAsync()
{
var tcsE = new TaskCompletionSource();
_queue.Enqueue(tcsE);
await _sem.WaitAsync();
if (_queue.TryDequeue(out var tcsD))
tcsD.SetResult();
await tcsE.Task;
}
public void Release()
{
_sem.Release();
}
public void Dispose()
{
_sem.Dispose();
}
}
我在 class 中使用它,我在其中为每个 DocumentId 存储了一个信号量,并且还记录了有多少锁定用户正在等待解锁。如果最后一个用户释放锁,则删除(因为内存):
public class DocIdLocker : IDisposable
{
private readonly ConcurrentDictionary<Guid, FifoAsyncLock> _docIdLocks = new ();
private readonly ConcurrentDictionary<Guid, int> _users = new ();
private bool _disposed;
public async Task<IAsyncDisposable> AquireLockAsync(Guid docId)
{
var userCount = _users.AddOrUpdate(docId, 1, (_, o) => o + 1);
await _docIdLocks.GetOrAdd(docId, new FifoAsyncLock()).WaitAsync();
return new Lock(this, docId);
}
private async Task Release(Guid docId)
{
if (!_docIdLocks.ContainsKey(docId))
throw new KeyNotFoundException($"Key not found: '{docId}'");
_docIdLocks[docId].Release();
if (!_users.ContainsKey(docId))
throw new KeyNotFoundException($"Key not found: '{docId}'");
if (--_users[docId] == 0)
{
_docIdLocks.TryRemove(docId, out _);
_users.TryRemove(docId, out _);
}
}
private class Lock : IAsyncDisposable
{
private readonly DocIdLocker _parent;
private readonly Guid _docId;
public Lock(DocIdLocker parent, Guid docId)
{
_parent = parent;
_docId = docId;
}
public ValueTask DisposeAsync() => new (_parent.Release(_docId));
}
public void Dispose()
{
if (_disposed)
return;
foreach (var item in _docIdLocks.Values)
item.Dispose();
_users.Clear();
_disposed = true;
}
}
但我的测试仍然显示 'a' 的顺序未保留。
我想,也许有些线程在锁不存在时获取元素,并乱序处理它们。好吧,这一切都变得很难推理,现在我脑子里乱七八糟。
是否有一种简单而优雅的方法来实现我想要实现的目标?
解决这个问题最简单的方法可能是根据命令的相关属性对命令进行分组,然后并行处理这些组而不是单个文档。然后对每一组执行一个顺序的foreach
循环,将相关的文档一个一个处理。示例:
string[] documents = new[] { 'n', 'a', 's', 'j', 'a', 'l', 'v', 'g', 'a', 'f', 'f' }
.Select((item, index) => $"{item}-{index}")
.ToArray();
Console.WriteLine($"Documents: [{String.Join(", ", documents)}]");
var grouped = documents.GroupBy(item => item[0]); // Group by the first char
ParallelOptions options = new()
{
MaxDegreeOfParallelism = Environment.ProcessorCount
};
Parallel.ForEach(grouped, options, grouping =>
{
foreach (var document in grouping)
{
Console.WriteLine($"Processing: {document}");
Thread.Sleep(500); // Simulate a CPU-bound or blocking operation
}
});
输出:
Documents: [n-0, a-1, s-2, j-3, a-4, l-5, v-6, g-7, a-8, f-9, f-10]
Processing: a-1
Processing: n-0
Processing: s-2
Processing: j-3
Processing: a-4
Processing: l-5
Processing: v-6
Processing: g-7
Processing: f-9
Processing: a-8
Processing: f-10
GroupBy
LINQ 运算符的排序行为已明确定义。根据文档:
The
IGrouping<TKey,TElement>
objects are yielded in an order based on the order of the elements in source that produced the first key of eachIGrouping<TKey,TElement>
. Elements in a grouping are yielded in the order that the elements that produced them appear in source.
这种方法有一些缺点:
- 在开始并行处理之前必须完全枚举源序列。如果源序列是延迟可枚举的,例如
BlockingCollection<T>
包含来自并行生产者的实时项目,这可能是一个问题。 - 处理顺序由第一个唯一键在源序列中出现的顺序决定,而不是由项目本身的顺序决定。因此,例如如果源是 (A, B, A, A, A, A, A) 并且并行度为 1,则 B 项将最后处理。
- 生成的分区方案可能不平衡。如果存在包含大量元素的键,并且这些键在源序列中出现较晚,则并行处理可能会在操作结束时并行化程度降低。为缓解此问题,最好根据组包含的项目数按降序对组重新排序。
下面是自定义 LINQ 运算符 ToConsumableGroupings
,它可能比标准 GroupBy
运算符更适合这种情况。它解决了前面提到的大部分问题,因为它懒惰地枚举源序列,并发出分组 on-the-go。它与 GroupBy
运算符具有相同的签名:
/// <summary>
/// Groups the elements of a sequence into consumable groupings, according to
/// a specified key selector function.
/// </summary>
/// <remarks>
/// For each key, more than one groupings can be emitted. A new grouping can be emitted
/// if the previously emitted grouping for the same key has been fully consumed.
/// </remarks>
public static IEnumerable<IGrouping<TKey, TSource>>
ToConsumableGroupings<TKey, TSource>(
this IEnumerable<TSource> source,
Func<TSource, TKey> keySelector,
IEqualityComparer<TKey> keyComparer = default)
{
var perKey = new Dictionary<TKey, Queue<TSource>>(keyComparer);
foreach (var item in source)
{
var key = keySelector(item);
lock (perKey)
{
if (perKey.TryGetValue(key, out var queue))
{
queue.Enqueue(item); continue;
}
queue = perKey[key] = new Queue<TSource>();
queue.Enqueue(item);
}
yield return new Grouping<TKey, TSource>(key, GetGroup(key));
}
IEnumerable<TSource> GetGroup(TKey key)
{
while (true)
{
TSource item;
lock (perKey)
{
var queue = perKey[key];
if (queue.Count == 0) { perKey.Remove(key); break; }
item = queue.Dequeue();
}
yield return item;
}
}
}
private class Grouping<TKey, TSource> : IGrouping<TKey, TSource>
{
private readonly TKey _key;
private readonly IEnumerable<TSource> _sequence;
public Grouping(TKey key, IEnumerable<TSource> sequence)
{
_key = key;
_sequence = sequence;
}
public TKey Key => _key;
public IEnumerator<TSource> GetEnumerator() => _sequence.GetEnumerator();
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
用法示例:
var grouped = documents.ToConsumableGroupings(item => item[0]);
与 GroupBy
运算符不同,ToConsumableGroupings
运算符发出 non-materialized 组,预计只会被枚举(消耗)一次。