
Best way to order parallel processing of commands with the same attribute

我有一列命令。其中一些具有相同的属性,例如“文档 ID”。我需要能够并行处理它们,但有一个限制:具有相同功能的命令应按照它们在队列中出现的顺序进行处理。

例如: 我的队列是 [n, a, s, j, a, l, v, g, a, f, f],其中字母是 DocumentId。我需要并行处理,但是 'a' 的处理应该按照它们在队列中出现的顺序进行,即 [1, 4, 8],其中数字是队列中字母的 ID .因此,这些元素的处理顺序无关紧要,只要 [8][4] 之后即可[1](它们之间有任意数量的中间项)。

首先,我尝试对 DocumentId 进行 SemaphoreSlim 锁定。这意味着,如果我们处理一个项目,如果其他线程应该处理同一个项目,我们就会阻塞它们。 这没有用,因为 SemaphoreSlim 不保证解除阻塞的 FIFO 顺序。

然后,我围绕 SemaphoreSlim 做了一个包装器,以强制执行 FIFO 解锁:

public class FifoAsyncLock : IDisposable
    private readonly SemaphoreSlim _sem = new (1, 1);
    private readonly ConcurrentQueue<TaskCompletionSource> _queue = new ();

    public async Task WaitAsync()
        var tcsE = new TaskCompletionSource();

        await _sem.WaitAsync();
        if (_queue.TryDequeue(out var tcsD))

        await tcsE.Task;

    public void Release()

    public void Dispose()

我在 class 中使用它,我在其中为每个 DocumentId 存储了一个信号量,并且还记录了有多少锁定用户正在等待解锁。如果最后一个用户释放锁,则删除(因为内存):

public class DocIdLocker : IDisposable
    private readonly ConcurrentDictionary<Guid, FifoAsyncLock> _docIdLocks = new ();
    private readonly ConcurrentDictionary<Guid, int> _users = new ();

    private bool _disposed;

    public async Task<IAsyncDisposable> AquireLockAsync(Guid docId)
        var userCount = _users.AddOrUpdate(docId, 1, (_, o) => o + 1);
        await _docIdLocks.GetOrAdd(docId, new FifoAsyncLock()).WaitAsync();
        return new Lock(this, docId);

    private async Task Release(Guid docId)
        if (!_docIdLocks.ContainsKey(docId))
            throw new KeyNotFoundException($"Key not found: '{docId}'");


        if (!_users.ContainsKey(docId))
            throw new KeyNotFoundException($"Key not found: '{docId}'");

        if (--_users[docId] == 0)
            _docIdLocks.TryRemove(docId, out _);
            _users.TryRemove(docId, out _);

    private class Lock : IAsyncDisposable
        private readonly DocIdLocker _parent;
        private readonly Guid _docId;

        public Lock(DocIdLocker parent, Guid docId)
            _parent = parent;
            _docId = docId;

        public ValueTask DisposeAsync() => new (_parent.Release(_docId));

    public void Dispose()
        if (_disposed)

        foreach (var item in _docIdLocks.Values)

         _disposed = true;

但我的测试仍然显示 'a' 的顺序未保留。




string[] documents = new[] { 'n', 'a', 's', 'j', 'a', 'l', 'v', 'g', 'a', 'f', 'f' }
    .Select((item, index) => $"{item}-{index}")
Console.WriteLine($"Documents: [{String.Join(", ", documents)}]");

var grouped = documents.GroupBy(item => item[0]); // Group by the first char

ParallelOptions options = new()
    MaxDegreeOfParallelism = Environment.ProcessorCount

Parallel.ForEach(grouped, options, grouping =>
    foreach (var document in grouping)
        Console.WriteLine($"Processing: {document}");
        Thread.Sleep(500); // Simulate a CPU-bound or blocking operation


Documents: [n-0, a-1, s-2, j-3, a-4, l-5, v-6, g-7, a-8, f-9, f-10]
Processing: a-1
Processing: n-0
Processing: s-2
Processing: j-3
Processing: a-4
Processing: l-5
Processing: v-6
Processing: g-7
Processing: f-9
Processing: a-8
Processing: f-10

Live demo.

GroupBy LINQ 运算符的排序行为已明确定义。根据文档:

The IGrouping<TKey,TElement> objects are yielded in an order based on the order of the elements in source that produced the first key of each IGrouping<TKey,TElement>. Elements in a grouping are yielded in the order that the elements that produced them appear in source.


  1. 在开始并行处理之前必须完全枚举源序列。如果源序列是延迟可枚举的,例如 BlockingCollection<T> 包含来自并行生产者的实时项目,这可能是一个问题。
  2. 处理顺序由第一个唯一键在源序列中出现的顺序决定,而不是由项目本身的顺序决定。因此,例如如果源是 (A, B, A, A, A, A, A) 并且并行度为 1,则 B 项将最后处理。
  3. 生成的分区方案可能不平衡。如果存在包含大量元素的键,并且这些键在源序列中出现较晚,则并行处理可能会在操作结束时并行化程度降低。为缓解此问题,最好根据组包含的项目数按降序对组重新排序。

下面是自定义 LINQ 运算符 ToConsumableGroupings,它可能比标准 GroupBy 运算符更适合这种情况。它解决了前面提到的大部分问题,因为它懒惰地枚举源序列,并发出分组 on-the-go。它与 GroupBy 运算符具有相同的签名:

/// <summary>
/// Groups the elements of a sequence into consumable groupings, according to
/// a specified key selector function.
/// </summary>
/// <remarks>
/// For each key, more than one groupings can be emitted. A new grouping can be emitted
/// if the previously emitted grouping for the same key has been fully consumed.
/// </remarks>
public static IEnumerable<IGrouping<TKey, TSource>>
    ToConsumableGroupings<TKey, TSource>(
    this IEnumerable<TSource> source,
    Func<TSource, TKey> keySelector,
    IEqualityComparer<TKey> keyComparer = default)
    var perKey = new Dictionary<TKey, Queue<TSource>>(keyComparer);
    foreach (var item in source)
        var key = keySelector(item);
        lock (perKey)
            if (perKey.TryGetValue(key, out var queue))
                queue.Enqueue(item); continue;
            queue = perKey[key] = new Queue<TSource>();
        yield return new Grouping<TKey, TSource>(key, GetGroup(key));

    IEnumerable<TSource> GetGroup(TKey key)
        while (true)
            TSource item;
            lock (perKey)
                var queue = perKey[key];
                if (queue.Count == 0) { perKey.Remove(key); break; }
                item = queue.Dequeue();
            yield return item;

private class Grouping<TKey, TSource> : IGrouping<TKey, TSource>
    private readonly TKey _key;
    private readonly IEnumerable<TSource> _sequence;
    public Grouping(TKey key, IEnumerable<TSource> sequence)
        _key = key;
        _sequence = sequence;
    public TKey Key => _key;
    public IEnumerator<TSource> GetEnumerator() => _sequence.GetEnumerator();
    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();


var grouped = documents.ToConsumableGroupings(item => item[0]);

GroupBy 运算符不同,ToConsumableGroupings 运算符发出 non-materialized 组,预计只会被枚举(消耗)一次。