如何异步查询两个 IAsyncEnumerables
How to query two IAsyncEnumerables asynchronously
我有两种方法连接到 Foo
的两个不同来源,其中 return 两个 IAsyncEnumerable<Foo>
。在能够处理它们之前,我需要从两个来源获取所有 Foo
。
问题:我想查询两个来源同时(异步),即。在开始枚举 Source2
之前,没有等待 Source1
完成枚举。根据我的理解,这就是下面方法 SequentialSourcesQuery
示例中发生的情况,对吗?
对于常规任务,我会先启动第一个任务,然后启动第二个任务,然后调用 await Task.WhenAll
。但是我对如何处理 IAsyncEnumerable
.
有点困惑
public class FoosAsync
{
public async IAsyncEnumerable<Foo> Source1() { }
public async IAsyncEnumerable<Foo> Source2() { }
public async Task<List<Foo>> SequentialSourcesQuery()
{
List<Foo> foos = new List<Foo>();
await foreach (Foo foo1 in Source1())
{
foos.Add(foo1);
}
await foreach (Foo foo2 in Source2())
{ //doesn't start until Source1 completed the enumeration?
foos.Add(foo2);
}
return foos;
}
}
您可以编写另一个异步本地方法 returns 任务。
Func<IAsyncEnumerable<Foo>, Task<List<Foo>>> readValues = async (values) => {
List<Foo> foos = new List<Foo>();
await foreach (Foo foo1 in values)
{
foos.Add(foo1);
}
return foos;
};
并这样称呼它:
Task<List<Foo>> task1 = readValues(Source1());
Task<List<Foo>> task2 = readValues(Source2());
await Task.WhenAll(task1, task2);
整个代码为:
public class FoosAsync
{
public async IAsyncEnumerable<Foo> Source1() { }
public async IAsyncEnumerable<Foo> Source2() { }
public async Task<List<Foo>> SequentialSourcesQuery()
{
var asyncEnumerator = Source1().GetAsyncEnumerator();
Func<IAsyncEnumerable<Foo>, Task<List<Foo>>> readValues = async (values) => {
List<Foo> foos2 = new List<Foo>();
await foreach (Foo foo in values)
{
foos2.Add(foo);
}
return foos2;
};
Task<List<Foo>> task1 = readValues(Source1());
Task<List<Foo>> task2 = readValues(Source2());
await Task.WhenAll(task1, task2);
List<Foo> foos = new List<Foo>(task1.Result.Count + task2.Result.Count);
foos.AddRange(task1.Result);
foos.AddRange(task2.Result);
return foos;
}
}
如果您有两个 IAsyncEnumerable<T>
作为源并且不关心传入数据的顺序,您可以使用如下方法交错处理数据。
public static class AsyncEnumerableExt
{
public static async IAsyncEnumerable<T> Interleave<T>(this IAsyncEnumerable<T> first, IAsyncEnumerable<T> second)
{
var enum1 = first.GetAsyncEnumerator();
var enum2 = second.GetAsyncEnumerator();
var nextWait1 = enum1.MoveNextAsync().AsTask();
var nextWait2 = enum2.MoveNextAsync().AsTask();
do
{
var task = await Task.WhenAny(nextWait1, nextWait2).ConfigureAwait(false);
if (task == nextWait1)
{
yield return enum1.Current;
nextWait1 = !await task.ConfigureAwait(false) ? null : enum1.MoveNextAsync().AsTask();
}
else if (task == nextWait2)
{
yield return enum2.Current;
nextWait2 = !await task.ConfigureAwait(false) ? null : enum2.MoveNextAsync().AsTask();
}
} while (nextWait1 != null && nextWait2 != null);
while (nextWait1 != null)
{
if (!await nextWait1.ConfigureAwait(false))
{
nextWait1 = null;
}
else
{
yield return enum1.Current;
nextWait1 = enum1.MoveNextAsync().AsTask();
}
}
while (nextWait2 != null)
{
if (!await nextWait2.ConfigureAwait(false))
{
nextWait2 = null;
}
else
{
yield return enum2.Current;
nextWait2 = enum2.MoveNextAsync().AsTask();
}
}
}
}
然后您可以使用一个 await foreach
来使用数据并将数据存储在列表中。
您可以利用库 System.Linq.Async and System.Interactive.Async (owned by the RxTeam who are part of the .NET Foundation). They contain operators like Merge
and ToListAsync
轻松解决您的问题。
// Merges elements from all of the specified async-enumerable sequences
// into a single async-enumerable sequence.
public static IAsyncEnumerable<TSource> Merge<TSource>(
params IAsyncEnumerable<TSource>[] sources);
// Creates a list from an async-enumerable sequence.
public static ValueTask<List<TSource>> ToListAsync<TSource>(
this IAsyncEnumerable<TSource> source,
CancellationToken cancellationToken = default);
把所有东西放在一起:
public Task<List<Foo>> SequentialSourcesQuery()
{
return AsyncEnumerableEx.Merge(Source1(), Source2()).ToListAsync().AsTask();
}
意识到这些库的重点是提供一组丰富的功能,而不是性能或效率。因此,如果一流的性能对您的用例很重要,niki.kante 的 很可能会胜过上述基于运算符的方法。
我有两种方法连接到 Foo
的两个不同来源,其中 return 两个 IAsyncEnumerable<Foo>
。在能够处理它们之前,我需要从两个来源获取所有 Foo
。
问题:我想查询两个来源同时(异步),即。在开始枚举 Source2
之前,没有等待 Source1
完成枚举。根据我的理解,这就是下面方法 SequentialSourcesQuery
示例中发生的情况,对吗?
对于常规任务,我会先启动第一个任务,然后启动第二个任务,然后调用 await Task.WhenAll
。但是我对如何处理 IAsyncEnumerable
.
public class FoosAsync
{
public async IAsyncEnumerable<Foo> Source1() { }
public async IAsyncEnumerable<Foo> Source2() { }
public async Task<List<Foo>> SequentialSourcesQuery()
{
List<Foo> foos = new List<Foo>();
await foreach (Foo foo1 in Source1())
{
foos.Add(foo1);
}
await foreach (Foo foo2 in Source2())
{ //doesn't start until Source1 completed the enumeration?
foos.Add(foo2);
}
return foos;
}
}
您可以编写另一个异步本地方法 returns 任务。
Func<IAsyncEnumerable<Foo>, Task<List<Foo>>> readValues = async (values) => {
List<Foo> foos = new List<Foo>();
await foreach (Foo foo1 in values)
{
foos.Add(foo1);
}
return foos;
};
并这样称呼它:
Task<List<Foo>> task1 = readValues(Source1());
Task<List<Foo>> task2 = readValues(Source2());
await Task.WhenAll(task1, task2);
整个代码为:
public class FoosAsync
{
public async IAsyncEnumerable<Foo> Source1() { }
public async IAsyncEnumerable<Foo> Source2() { }
public async Task<List<Foo>> SequentialSourcesQuery()
{
var asyncEnumerator = Source1().GetAsyncEnumerator();
Func<IAsyncEnumerable<Foo>, Task<List<Foo>>> readValues = async (values) => {
List<Foo> foos2 = new List<Foo>();
await foreach (Foo foo in values)
{
foos2.Add(foo);
}
return foos2;
};
Task<List<Foo>> task1 = readValues(Source1());
Task<List<Foo>> task2 = readValues(Source2());
await Task.WhenAll(task1, task2);
List<Foo> foos = new List<Foo>(task1.Result.Count + task2.Result.Count);
foos.AddRange(task1.Result);
foos.AddRange(task2.Result);
return foos;
}
}
如果您有两个 IAsyncEnumerable<T>
作为源并且不关心传入数据的顺序,您可以使用如下方法交错处理数据。
public static class AsyncEnumerableExt
{
public static async IAsyncEnumerable<T> Interleave<T>(this IAsyncEnumerable<T> first, IAsyncEnumerable<T> second)
{
var enum1 = first.GetAsyncEnumerator();
var enum2 = second.GetAsyncEnumerator();
var nextWait1 = enum1.MoveNextAsync().AsTask();
var nextWait2 = enum2.MoveNextAsync().AsTask();
do
{
var task = await Task.WhenAny(nextWait1, nextWait2).ConfigureAwait(false);
if (task == nextWait1)
{
yield return enum1.Current;
nextWait1 = !await task.ConfigureAwait(false) ? null : enum1.MoveNextAsync().AsTask();
}
else if (task == nextWait2)
{
yield return enum2.Current;
nextWait2 = !await task.ConfigureAwait(false) ? null : enum2.MoveNextAsync().AsTask();
}
} while (nextWait1 != null && nextWait2 != null);
while (nextWait1 != null)
{
if (!await nextWait1.ConfigureAwait(false))
{
nextWait1 = null;
}
else
{
yield return enum1.Current;
nextWait1 = enum1.MoveNextAsync().AsTask();
}
}
while (nextWait2 != null)
{
if (!await nextWait2.ConfigureAwait(false))
{
nextWait2 = null;
}
else
{
yield return enum2.Current;
nextWait2 = enum2.MoveNextAsync().AsTask();
}
}
}
}
然后您可以使用一个 await foreach
来使用数据并将数据存储在列表中。
您可以利用库 System.Linq.Async and System.Interactive.Async (owned by the RxTeam who are part of the .NET Foundation). They contain operators like Merge
and ToListAsync
轻松解决您的问题。
// Merges elements from all of the specified async-enumerable sequences
// into a single async-enumerable sequence.
public static IAsyncEnumerable<TSource> Merge<TSource>(
params IAsyncEnumerable<TSource>[] sources);
// Creates a list from an async-enumerable sequence.
public static ValueTask<List<TSource>> ToListAsync<TSource>(
this IAsyncEnumerable<TSource> source,
CancellationToken cancellationToken = default);
把所有东西放在一起:
public Task<List<Foo>> SequentialSourcesQuery()
{
return AsyncEnumerableEx.Merge(Source1(), Source2()).ToListAsync().AsTask();
}
意识到这些库的重点是提供一组丰富的功能,而不是性能或效率。因此,如果一流的性能对您的用例很重要,niki.kante 的