如何在项目准备好后立即解析 IAsyncEnumerables 列表
How to resolve lists of IAsyncEnumerables as soon as a item is ready
public async IAsyncEnumerable<Entity> FindByIds(List<string> ids)
{
List<List<string>> splitIdsList = ids.Split(5);
var entityList = splitIdsList.Select(x => FindByIdsQuery(x)).ToList();
foreach (var entities in entityList)
{
await foreach (var entity in entities)
{
yield return entity;
}
}
}
private async IAsyncEnumerable<Entity> FindByIdsQuery(List<string> ids)
{
var result = await Connection.QueryAsync(query, new {ids})
foreach (var entity in result)
{
yield return entity;
}
}
如果我向这个函数发送 25 个 ID。第一个 FindByIdsQuery 需要 5000 毫秒。其他 4 个 FindByIdsQuery 需要 100ms。然后这个解决方案不会输出任何实体,直到 5000 毫秒之后。是否有任何解决方案可以在有人输出时立即开始输出实体。或者,如果您可以使用 Task.WhenAny
.
在 Task 中执行类似操作
明确一点:5 个查询中的任何一个都可能需要 5000 毫秒。
问题是您的代码让他们等待。这里的异步 foreach 没有任何意义,因为 - 你不执行异步。
你这样做:
var entityList = splitIdsList.Select(x => FindByIdsQuery(x)).ToList();
这是可以 运行 异步的查询部分,但它不是,因为您将整个结果集具体化到一个列表中。然后您继续对其进行异步循环,但此时所有结果都已在内存中。
获得异步的方法就是去掉 ToList。将查询转储到 foreach 中,不要将其具体化到内存中。 async foreach 应该命中 ef 级查询(而不是查询结果),这样您就可以在从数据库中获取信息时对其进行处理。 ToList 有效地绕过了这个。
还了解到 EF 无法有效地处理多个 id 查找。唯一可能的方法是将它们放入一个数组中并包含一个 SQL "IN" 子句——对于较大的数字来说效率非常低,因为它强制进行 table 扫描。有效的 SQL 方法是将它们加载到具有统计信息的 table 值变量中并使用连接,但在 EF 中无法做到这一点 - 限制之一。长 IN 子句的 SQL 限制有据可查。 EF 端的限制没有,但它们仍然存在。
从你的评论中,我明白了你的问题。您基本上要寻找的是某种“SelectMany
”运算符。此运算符将开始等待所有 IAsyncEnumerables
和 return 项的顺序,而不管源异步枚举的顺序如何。
我希望默认的 AsyncEnumerable.SelectMany
可以做到这一点,但我发现这不是真的。它遍历源可枚举,然后遍历整个内部可枚举,然后继续下一步。所以我一起破解了 SelectMany
变体,它可以同时正确等待所有内部异步枚举。 请注意,我不保证正确性,也不保证安全性。零错误处理。
/// <summary>
/// Starts all inner IAsyncEnumerable and returns items from all of them in order in which they come.
/// </summary>
public static async IAsyncEnumerable<TItem> SelectManyAsync<TItem>(IEnumerable<IAsyncEnumerable<TItem>> source)
{
// get enumerators from all inner IAsyncEnumerable
var enumerators = source.Select(x => x.GetAsyncEnumerator()).ToList();
List<Task<(IAsyncEnumerator<TItem>, bool)>> runningTasks = new List<Task<(IAsyncEnumerator<TItem>, bool)>>();
// start all inner IAsyncEnumerable
foreach (var asyncEnumerator in enumerators)
{
runningTasks.Add(MoveNextWrapped(asyncEnumerator));
}
// while there are any running tasks
while (runningTasks.Any())
{
// get next finished task and remove it from list
var finishedTask = await Task.WhenAny(runningTasks);
runningTasks.Remove(finishedTask);
// get result from finished IAsyncEnumerable
var result = await finishedTask;
var asyncEnumerator = result.Item1;
var hasItem = result.Item2;
// if IAsyncEnumerable has item, return it and put it back as running for next item
if (hasItem)
{
yield return asyncEnumerator.Current;
runningTasks.Add(MoveNextWrapped(asyncEnumerator));
}
}
// don't forget to dispose, should be in finally
foreach (var asyncEnumerator in enumerators)
{
await asyncEnumerator.DisposeAsync();
}
}
/// <summary>
/// Helper method that returns Task with tuple of IAsyncEnumerable and it's result of MoveNextAsync.
/// </summary>
private static async Task<(IAsyncEnumerator<TItem>, bool)> MoveNextWrapped<TItem>(IAsyncEnumerator<TItem> asyncEnumerator)
{
var res = await asyncEnumerator.MoveNextAsync();
return (asyncEnumerator, res);
}
然后您可以使用它来合并所有枚举而不是第一个 foreach:
var entities = SelectManyAsync(splitIdsList.Select(x => FindByIdsQuery(x)));
return entities;
public async IAsyncEnumerable<Entity> FindByIds(List<string> ids)
{
List<List<string>> splitIdsList = ids.Split(5);
var entityList = splitIdsList.Select(x => FindByIdsQuery(x)).ToList();
foreach (var entities in entityList)
{
await foreach (var entity in entities)
{
yield return entity;
}
}
}
private async IAsyncEnumerable<Entity> FindByIdsQuery(List<string> ids)
{
var result = await Connection.QueryAsync(query, new {ids})
foreach (var entity in result)
{
yield return entity;
}
}
如果我向这个函数发送 25 个 ID。第一个 FindByIdsQuery 需要 5000 毫秒。其他 4 个 FindByIdsQuery 需要 100ms。然后这个解决方案不会输出任何实体,直到 5000 毫秒之后。是否有任何解决方案可以在有人输出时立即开始输出实体。或者,如果您可以使用 Task.WhenAny
.
明确一点:5 个查询中的任何一个都可能需要 5000 毫秒。
问题是您的代码让他们等待。这里的异步 foreach 没有任何意义,因为 - 你不执行异步。
你这样做:
var entityList = splitIdsList.Select(x => FindByIdsQuery(x)).ToList();
这是可以 运行 异步的查询部分,但它不是,因为您将整个结果集具体化到一个列表中。然后您继续对其进行异步循环,但此时所有结果都已在内存中。
获得异步的方法就是去掉 ToList。将查询转储到 foreach 中,不要将其具体化到内存中。 async foreach 应该命中 ef 级查询(而不是查询结果),这样您就可以在从数据库中获取信息时对其进行处理。 ToList 有效地绕过了这个。
还了解到 EF 无法有效地处理多个 id 查找。唯一可能的方法是将它们放入一个数组中并包含一个 SQL "IN" 子句——对于较大的数字来说效率非常低,因为它强制进行 table 扫描。有效的 SQL 方法是将它们加载到具有统计信息的 table 值变量中并使用连接,但在 EF 中无法做到这一点 - 限制之一。长 IN 子句的 SQL 限制有据可查。 EF 端的限制没有,但它们仍然存在。
从你的评论中,我明白了你的问题。您基本上要寻找的是某种“SelectMany
”运算符。此运算符将开始等待所有 IAsyncEnumerables
和 return 项的顺序,而不管源异步枚举的顺序如何。
我希望默认的 AsyncEnumerable.SelectMany
可以做到这一点,但我发现这不是真的。它遍历源可枚举,然后遍历整个内部可枚举,然后继续下一步。所以我一起破解了 SelectMany
变体,它可以同时正确等待所有内部异步枚举。 请注意,我不保证正确性,也不保证安全性。零错误处理。
/// <summary>
/// Starts all inner IAsyncEnumerable and returns items from all of them in order in which they come.
/// </summary>
public static async IAsyncEnumerable<TItem> SelectManyAsync<TItem>(IEnumerable<IAsyncEnumerable<TItem>> source)
{
// get enumerators from all inner IAsyncEnumerable
var enumerators = source.Select(x => x.GetAsyncEnumerator()).ToList();
List<Task<(IAsyncEnumerator<TItem>, bool)>> runningTasks = new List<Task<(IAsyncEnumerator<TItem>, bool)>>();
// start all inner IAsyncEnumerable
foreach (var asyncEnumerator in enumerators)
{
runningTasks.Add(MoveNextWrapped(asyncEnumerator));
}
// while there are any running tasks
while (runningTasks.Any())
{
// get next finished task and remove it from list
var finishedTask = await Task.WhenAny(runningTasks);
runningTasks.Remove(finishedTask);
// get result from finished IAsyncEnumerable
var result = await finishedTask;
var asyncEnumerator = result.Item1;
var hasItem = result.Item2;
// if IAsyncEnumerable has item, return it and put it back as running for next item
if (hasItem)
{
yield return asyncEnumerator.Current;
runningTasks.Add(MoveNextWrapped(asyncEnumerator));
}
}
// don't forget to dispose, should be in finally
foreach (var asyncEnumerator in enumerators)
{
await asyncEnumerator.DisposeAsync();
}
}
/// <summary>
/// Helper method that returns Task with tuple of IAsyncEnumerable and it's result of MoveNextAsync.
/// </summary>
private static async Task<(IAsyncEnumerator<TItem>, bool)> MoveNextWrapped<TItem>(IAsyncEnumerator<TItem> asyncEnumerator)
{
var res = await asyncEnumerator.MoveNextAsync();
return (asyncEnumerator, res);
}
然后您可以使用它来合并所有枚举而不是第一个 foreach:
var entities = SelectManyAsync(splitIdsList.Select(x => FindByIdsQuery(x)));
return entities;