如何在 EF Core 3.1 中以异步方式使用 GroupBy?

How to use GroupBy in an asynchronous manner in EF Core 3.1?

当我将 GroupBy 用作对 EFCore 的 LINQ 查询的一部分时,我收到错误 System.InvalidOperationException: Client-side GroupBy is not supported

这是因为 EF Core 3.1 尝试尽可能多地在服务器端评估查询,而不是在客户端评估它们,并且调用无法转换为 SQL。

所以下面的语句不起作用,并产生上面提到的错误:

var blogs = await context.Blogs
    .Where(blog => blog.Url.Contains("dotnet"))
    .GroupBy(t => t.BlobNumber)
    .Select(b => b)
    .ToListAsync();

现在显然解决方案是在调用 GroupBy() 之前使用 use.AsEnumerable() 或 .ToList(),因为它明确告诉 EF Core 您想要在客户端进行分组。有关于这个的讨论on GitHub and in the Microsoft docs

var blogs = context.Blogs
    .Where(blog => blog.Url.Contains("dotnet"))
    .AsEnumerable()
    .GroupBy(t => t.BlobNumber)
    .Select(b => b)
    .ToList();

但是,这不是异步的。我怎样才能让它异步?

如果我将 AsEnumerable() 更改为 AsAsyncEnumerable(),我会收到错误消息。如果我改为尝试将 AsEnumerable() 更改为 ToListAsync(),则 GroupBy() 命令将失败。

我正在考虑将其包装在 Task.FromResult 中,但这实际上是异步的吗?还是数据库查询还是同步的,只是后面的分组是异步的?

var blogs = await Task.FromResult(context.Blogs
    .Where(blog => blog.Url.Contains("dotnet"))
    .AsEnumerable()
    .GroupBy(t => t.BlobNumber)
    .Select(b => b)
    .ToList());

或者如果这不起作用还有其他方法吗?

我认为你唯一的办法就是像这样做

var blogs = await context.Blogs
    .Where(blog => blog.Url.Contains("dotnet"))
    .ToListAsync();

var groupedBlogs = blogs.GroupBy(t => t.BlobNumber).Select(b => b).ToList();

因为 GroupBy 无论如何都会在客户端进行评估

此查询并未尝试在 SQL/EF 核心意义上对数据进行分组。不涉及聚合。

它正在加载所有详细信息行,然后将它们分批放入客户端的不同存储桶中。 EF Core 不参与其中,这是一个纯粹的客户端操作。等价于:

var blogs = await context.Blogs
    .Where(blog => blog.Url.Contains("dotnet"))
    .ToListAsync();

var blogsByNum = blogs.ToLookup(t => t.BlobNumber);

加速分组

batching/grouping/lookup 操作纯粹是 CPU 绑定的,因此加速它的唯一方法是并行化它,即使用所有 CPU 对数据进行分组,例如:

var blogsByNum = blogs.AsParallel()
                      .ToLookup(t => t.BlobNumber);

ToLookup 或多或少与 GroupBy().ToList() 相同 - 它根据键

将行分组到桶中

加载时分组

另一种方法是 异步加载 结果,并在结果到达时将其放入存储桶中。为此,我们需要 AsAsyncEnumerable()ToListAsync() returns一次全部结果,所以不能用。

这种方法与 ToLookup 的做法非常相似。


var blogs = await context.Blogs
    .Where(blog => blog.Url.Contains("dotnet"));

var blogsByNum=new Dictionary<string,List<Blog>>();

await foreach(var blog in blogs.AsAsyncEnumerable())
{
    if(blogsByNum.TryGetValue(blog.BlobNumber,out var blogList))
    {
        blogList.Add(blog);
    }
    else
    {
        blogsByNum[blog.BlobNumber=new List<Blog>(100){blog};
    }
}

通过调用 AsAsyncEnumerable() 执行查询。虽然结果是异步到达的,所以现在我们可以在迭代时将它们添加到桶中。

capacity 参数用于列表构造函数以避免重新分配列表的内部缓冲区。

使用System.LINQ.Async

如果我们对 IAsyncEnumerable<> 本身进行 LINQ 操作,事情会容易得多。 This extension namespace provides just that. It's developed by the ReactiveX team. It's available through NuGet 当前主要版本是 4.0.

有了这个,我们可以写成:

var blogs = await context.Blogs
    .Where(blog => blog.Url.Contains("dotnet"));

var blogsByNum=await blogs.AsAsyncEnumerable()   individual rows asynchronously
                          .ToLookupAsync(blog=>blog.BlobNumber);

var blogsByNum=await blogs.AsAsyncEnumerable()   
                          .GroupBy(blog=>blog.BlobNumber)
                          .Select(b=>b)
                          .ToListAsync();