如果与 async/await 一起使用(使用 Dapper 从 SQL 服务器流式传输数据),返回 IEnumerable 会发生什么情况?

What happens with returning IEnumerable if used with async/await (streaming data from SQL Server with Dapper)?

我正在使用 Dapper 从 SQL 服务器中的一个非常大的集合流式传输数据。它在返回 IEnumerable 和调用 Query() 时工作正常,但是当我切换到 QueryAsync() 时,程序似乎试图从 SQL 服务器读取所有数据而不是流式传输。

根据这个 question,它应该可以与我正在做的 buffered: false 一起正常工作,但是问题没有提到 async/await.

现在根据这个question,用QueryAsync()做我想做的事情并不简单。

我是否正确理解为 async/await 切换上下文时迭代可枚举?

另一个问题,当新的 C#8 异步流可用时,这是否可以做?

2020 年 3 月更新

.NET Core 3.0(和 3.1)现已发布,全面支持异步流。 Microsoft.Bcl.AsyncInterfaces adds support for them to .NET Standard 2.0 and .NET Framework 4.6.1+, although 4.7.2 should be used for sanity reasons. As the docs on .NET Standard implementation support explain

While NuGet considers .NET Framework 4.6.1 as supporting .NET Standard 1.5 through 2.0, there are several issues with consuming .NET Standard libraries that were built for those versions from .NET Framework 4.6.1 projects.

For .NET Framework projects that need to use such libraries, we recommend that you upgrade the project to target .NET Framework 4.7.2 or higher.

原答案

如果你check the source code,你会发现你的怀疑几乎是正确的。当 buffered 为 false 时,QueryAsync 将同步流式传输

if (command.Buffered)
{
    var buffer = new List<T>();
    var convertToType = Nullable.GetUnderlyingType(effectiveType) ?? effectiveType;
    while (await reader.ReadAsync(cancel).ConfigureAwait(false))
    {
        object val = func(reader);
        if (val == null || val is T)
        {
            buffer.Add((T)val);
        }
        else
        {
            buffer.Add((T)Convert.ChangeType(val, convertToType, CultureInfo.InvariantCulture));
        }
    }
    while (await reader.NextResultAsync(cancel).ConfigureAwait(false)) { /* ignore subsequent result sets */ }
    command.OnCompleted();
    return buffer;
}
else
{
    // can't use ReadAsync / cancellation; but this will have to do
    wasClosed = false; // don't close if handing back an open reader; rely on the command-behavior
    var deferred = ExecuteReaderSync<T>(reader, func, command.Parameters);
    reader = null; // to prevent it being disposed before the caller gets to see it
    return deferred;
}

正如评论所解释的那样,当 return 类型预期为 IEnumerable 时,无法使用 ReadAsync。这就是必须引入 C# 8 的异步枚举的原因。

ExecuteReaderSync 的代码是:

private static IEnumerable<T> ExecuteReaderSync<T>(IDataReader reader, Func<IDataReader, object> func, object parameters)
{
    using (reader)
    {
        while (reader.Read())
        {
            yield return (T)func(reader);
        }
        while (reader.NextResult()) { /* ignore subsequent result sets */ }
        (parameters as IParameterCallbacks)?.OnCompleted();
    }
}

它使用 Read 而不是 ReadAsync

C#8 异步流将允许将其重写为 return 和 IAsyncEnumerable。简单地更改语言版本并不能解决问题。

鉴于当前关于异步流的文档,这可能看起来像:

private static async IAsyncEnumerable<T> ExecuteReaderASync<T>(IDataReader reader, Func<IDataReader, object> func, object parameters)
{
    using (reader)
    {
        while (await reader.ReadAsync())
        {
            yield return (T)func(reader);
        }

        while (await reader.NextResultAsync(cancel).ConfigureAwait(false)) { /* ignore subsequent result sets */ }
         command.OnCompleted();
        (parameters as IParameterCallbacks)?.OnCompleted();
    }
}

Buuuuuut 异步流是只能在 .NET Core 上运行的功能之一,可能尚未实现。当我试图在 Sharplab.io 中写一个时,Kaboom。 [connection lost, reconnecting…]

在 dapper 的上下文中 特别是 ,是的:它需要一个不同的 API 正如@Panagiotis 的出色回答所解释的那样。以下内容并非 答案 本身,而是面临相同挑战的实施者可能希望考虑的额外上下文。

我还没有"spiked"这个小巧玲珑的(虽然我 SE.Redis),我在各种选择之间左右为难:

  1. 只为 .NET Core 添加一个新的 API,return使用适当的异步可枚举类型
  2. 将现有的 API 作为重大更改("major" 等)完全粉碎,将其更改为 return 异步可枚举类型

我们可能会选择“1”,但我不得不说,第二个选项非常诱人,原因很充分:

  • 现有的 API 可能不会做人们期望的事情
  • 我们希望新代码开始使用它

但奇怪的是 IAsyncEnumerable<T> 的 .NET Core 3.0 特性——显然 Dapper 不仅仅针对 .NET Core 3.0;我们可以:

  1. 将功能限制为 .NET Core 3.0,并且 return IAsyncEnumerable<T>
  2. 限制为.NET Core 3.0,并且returnIAsyncEnumerable<T>
  3. 对以前的框架依赖 System.Linq.Async(这不是 "official",但对于我们的目的来说已经足够正式了),并且 return IAsyncEnumerable<T>
  4. return 自定义可枚举类型 实际上 IAsyncEnumerable<T>(但在可用时实现 IAsyncEnumerable<T>),并手动实现状态机 - foreach 的鸭子类型性质意味着只要我们的自定义可枚举类型提供正确的方法
  5. ,它就可以正常工作

我认为我们可能选择选项 3,但重申:是的,有些事情需要改变。

(这应该是评论 // 信誉不够,到目前为止)

Marc Gravell 在他的 中提到 IAsyncEnumerable<T> 更可取,但由于对 NET Core 3.0 的依赖,最好依赖 System.Linq.Async(可以认为是 "official-enough")...

在这种情况下,我想到了 https://github.com/Dasync/AsyncEnumerable(MIT 许可证): 它旨在帮助

... to (a) create an element provider, where producing an element can take a lot of time due to dependency on other asynchronous events (e.g. wait handles, network streams), and (b) a consumer that processes those element as soon as they are ready without blocking the thread (the processing is scheduled on a worker thread instead).

再来一句,回复:"What happens when C# 8.0 is released?" (FAQ)

C# 8.0 should have a feature of Async Streams. When the version of the language is finally realeased, it should be a straight-forward upgrade path for your application.