如何异步查询两个 IAsyncEnumerables

How to query two IAsyncEnumerables asynchronously

我有两种方法连接到 Foo 的两个不同来源,其中 return 两个 IAsyncEnumerable<Foo>。在能够处理它们之前,我需要从两个来源获取所有 Foo

问题:我想查询两个来源同时(异步),即。在开始枚举 Source2 之前,没有等待 Source1 完成枚举。根据我的理解,这就是下面方法 SequentialSourcesQuery 示例中发生的情况,对吗?

对于常规任务,我会先启动第一个任务,然后启动第二个任务,然后调用 await Task.WhenAll。但是我对如何处理 IAsyncEnumerable.

有点困惑
public class FoosAsync
{
    public async IAsyncEnumerable<Foo> Source1() { }

    public async IAsyncEnumerable<Foo> Source2() { }

    public async Task<List<Foo>> SequentialSourcesQuery()
    {
        List<Foo> foos = new List<Foo>();

        await foreach (Foo foo1 in Source1())
        {
            foos.Add(foo1);
        }

        await foreach (Foo foo2 in Source2())
        { //doesn't start until Source1 completed the enumeration? 
            foos.Add(foo2);
        }

        return foos;
    }
}

您可以编写另一个异步本地方法 returns 任务。

Func<IAsyncEnumerable<Foo>, Task<List<Foo>>> readValues = async (values) => {
        List<Foo> foos = new List<Foo>();
        await foreach (Foo foo1 in values)
        {
            foos.Add(foo1);
        }        
        return foos;
};

并这样称呼它:

Task<List<Foo>> task1 = readValues(Source1());
Task<List<Foo>> task2 = readValues(Source2());

await Task.WhenAll(task1, task2);

整个代码为:

public class FoosAsync
{
    public async IAsyncEnumerable<Foo> Source1() { }

    public async IAsyncEnumerable<Foo> Source2() { }

    public async Task<List<Foo>> SequentialSourcesQuery()
    {
        var asyncEnumerator = Source1().GetAsyncEnumerator();
        Func<IAsyncEnumerable<Foo>, Task<List<Foo>>> readValues = async (values) => {
            List<Foo> foos2 = new List<Foo>();
            await foreach (Foo foo in values)
            {
                foos2.Add(foo);
            }        
            return foos2;
        };
        
        Task<List<Foo>> task1 = readValues(Source1());
        Task<List<Foo>> task2 = readValues(Source2());
        
        await Task.WhenAll(task1, task2);
        
        List<Foo> foos = new List<Foo>(task1.Result.Count + task2.Result.Count);
        foos.AddRange(task1.Result);
        foos.AddRange(task2.Result);

        return foos;
    }
}

如果您有两个 IAsyncEnumerable<T> 作为源并且不关心传入数据的顺序,您可以使用如下方法交错处理数据。

public static class AsyncEnumerableExt
{
    public static async IAsyncEnumerable<T> Interleave<T>(this IAsyncEnumerable<T> first, IAsyncEnumerable<T> second)
    {
        var enum1 = first.GetAsyncEnumerator();
        var enum2 = second.GetAsyncEnumerator();

        var nextWait1 = enum1.MoveNextAsync().AsTask();
        var nextWait2 = enum2.MoveNextAsync().AsTask();

        do
        {
            var task = await Task.WhenAny(nextWait1, nextWait2).ConfigureAwait(false);

            if (task == nextWait1)
            {
                yield return enum1.Current;

                nextWait1 = !await task.ConfigureAwait(false) ? null : enum1.MoveNextAsync().AsTask();
            }
            else if (task == nextWait2)
            {
                yield return enum2.Current;

                nextWait2 = !await task.ConfigureAwait(false) ? null : enum2.MoveNextAsync().AsTask();
            }
        } while (nextWait1 != null && nextWait2 != null);

        while (nextWait1 != null)
        {
            if (!await nextWait1.ConfigureAwait(false))
            {
                nextWait1 = null;
            }
            else
            {
                yield return enum1.Current;
                nextWait1 = enum1.MoveNextAsync().AsTask();
            }
        }

        while (nextWait2 != null)
        {
            if (!await nextWait2.ConfigureAwait(false))
            {
                nextWait2 = null;
            }
            else
            {
                yield return enum2.Current;
                nextWait2 = enum2.MoveNextAsync().AsTask();
            }
        }
    }
}

然后您可以使用一个 await foreach 来使用数据并将数据存储在列表中。

您可以利用库 System.Linq.Async and System.Interactive.Async (owned by the RxTeam who are part of the .NET Foundation). They contain operators like Merge and ToListAsync 轻松解决您的问题。

// Merges elements from all of the specified async-enumerable sequences
// into a single async-enumerable sequence.
public static IAsyncEnumerable<TSource> Merge<TSource>(
    params IAsyncEnumerable<TSource>[] sources);

// Creates a list from an async-enumerable sequence.
public static ValueTask<List<TSource>> ToListAsync<TSource>(
    this IAsyncEnumerable<TSource> source,
    CancellationToken cancellationToken = default);

把所有东西放在一起:

public Task<List<Foo>> SequentialSourcesQuery()
{
    return AsyncEnumerableEx.Merge(Source1(), Source2()).ToListAsync().AsTask();
}

意识到这些库的重点是提供一组丰富的功能,而不是性能或效率。因此,如果一流的性能对您的用例很重要,niki.kante 的 很可能会胜过上述基于运算符的方法。