如何对 IAsyncEnumerable 进行惰性分区?

How to lazily partition an IAsyncEnumerable?

我有一个 IAsyncEnumerable,它 return 本质上是 Key/IEnumerable<Value> 对的序列。我有代码使用这个和其他类似的枚举,假设它将接收一个唯一的键集合。但是我的一个数据源不遵守此约束。但是,它确实会将重复的键组合在一起。 (您不会看到 [k1k2k1]。)

使用按键对数据进行分区并连接值的包装器解决这个问题应该相当简单,只是我在 System.Linq.Async 中没有看到任何可用的分区运算符。有 GroupByToLookup,但这两个都是急切的运算符,会立即消耗整个序列。由于涉及大量数据,这不适合我的目的。

是否有任何简单的方法来划分类似于 GroupByIAsyncEnumerable,根据键选择器对输入进行分组,但保持其行为完全惰性并在键更改时按需生成新分组?

编辑: 我查看了 MoreLINQ 是否有类似的东西,发现 GroupAdjacent,但是 the code shows 虽然它不会急切地消耗整个输入序列,它仍然会在开始一个新组时急切地消耗整个组。我正在寻找一种方法,该方法将 return 分组中的惰性可枚举。这比听起来更棘手!

这是一个用于异步序列的 GroupAdjacent 运算符,类似于 synonymous operator of the MoreLinq 包,不同之处在于它不缓冲发出的分组的元素。分组应该以正确的顺序完全枚举,一次一个分组,否则将抛出 InvalidOperationException

此实现需要包 System.Linq.Async, because it emits groupings that implement the IAsyncGrouping<out TKey, out TElement> 接口。

/// <summary>
/// Groups the adjacent elements of a sequence according to a specified
/// key selector function.
/// </summary>
/// <remarks>
/// The groups don't contain buffered elements.
/// Enumerating the groups in the correct order is mandatory.
/// </remarks>
public static IAsyncEnumerable<IAsyncGrouping<TKey, TSource>>
    GroupAdjacent<TSource, TKey>(
        this IAsyncEnumerable<TSource> source,
        Func<TSource, TKey> keySelector,
        IEqualityComparer<TKey> keyComparer = null)
{
    ArgumentNullException.ThrowIfNull(source);
    ArgumentNullException.ThrowIfNull(keySelector);
    keyComparer ??= EqualityComparer<TKey>.Default;
    return Implementation();

    async IAsyncEnumerable<IAsyncGrouping<TKey, TSource>> Implementation(
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        Tuple<TSource, TKey, bool> sharedState = null;
        var enumerator = source.GetAsyncEnumerator(cancellationToken);
        try
        {
            if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
                yield break;
            var firstItem = enumerator.Current;
            var firstKey = keySelector(firstItem);
            sharedState = new(firstItem, firstKey, true);

            Tuple<TSource, TKey, bool> previousState = null;
            while (true)
            {
                var state = Volatile.Read(ref sharedState);
                if (ReferenceEquals(state, previousState))
                    throw new InvalidOperationException("Out of order enumeration.");
                var (item, key, exists) = state;
                if (!exists) yield break;
                previousState = state;
                yield return new AsyncGrouping<TKey, TSource>(key, GetAdjacent(state));
            }
        }
        finally { await enumerator.DisposeAsync().ConfigureAwait(false); }

        async IAsyncEnumerable<TSource> GetAdjacent(Tuple<TSource, TKey, bool> state)
        {
            if (!ReferenceEquals(Volatile.Read(ref sharedState), state))
                throw new InvalidOperationException("Out of order enumeration.");
            var (stateItem, stateKey, stateExists) = state;
            Debug.Assert(stateExists);
            yield return stateItem;
            Tuple<TSource, TKey, bool> nextState;
            while (true)
            {
                if (!ReferenceEquals(Volatile.Read(ref sharedState), state))
                    throw new InvalidOperationException("Out of order enumeration.");
                if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
                {
                    nextState = new(default, default, false);
                    break;
                }
                var item = enumerator.Current;
                var key = keySelector(item);
                if (!keyComparer.Equals(key, stateKey))
                {
                    nextState = new(item, key, true);
                    break;
                }
                yield return item;
            }
            if (!ReferenceEquals(Interlocked.CompareExchange(
                ref sharedState, nextState, state), state))
                throw new InvalidOperationException("Out of order enumeration.");
        }
    }
}

private class AsyncGrouping<TKey, TElement> : IAsyncGrouping<TKey, TElement>
{
    private readonly TKey _key;
    private readonly IAsyncEnumerable<TElement> _sequence;

    public AsyncGrouping(TKey key, IAsyncEnumerable<TElement> sequence)
    {
        _key = key;
        _sequence = sequence;
    }

    public TKey Key => _key;

    public IAsyncEnumerator<TElement> GetAsyncEnumerator(
        CancellationToken cancellationToken = default)
    {
        return _sequence.GetAsyncEnumerator(cancellationToken);
    }
}

用法示例:

IAsyncEnumerable<IGrouping<string, double>> source = //...

IAsyncEnumerable<IAsyncGrouping<string, double>> merged = source
    .GroupAdjacent(g => g.Key)
    .Select(gg => new AsyncGrouping<string, double>(
        gg.Key, gg.Select(g => g.ToAsyncEnumerable()).Concat()));

此示例从包含分组的序列开始,目标是将任何具有相同键的相邻分组合并为包含所有元素的单个异步分组。应用 GroupAdjacent(g => g.Key) 运算符后,我们得到这种类型:

IAsyncEnumerable<IAsyncGrouping<string, IGrouping<string, double>>>

所以在这个阶段,每个异步分组都包含内部分组,而不是单个元素。我们需要Concat这个嵌套结构才能得到我们想要的。 Concat 运算符存在于 System.Interactive.Async 包中,并且具有以下签名:

public static IAsyncEnumerable<TSource> Concat<TSource>(
    this IAsyncEnumerable<IAsyncEnumerable<TSource>> sources);

ToAsyncEnumerable 运算符 (System.Linq.Async) 附加到同步内部分组,以满足此签名。