如何使用 SqlDataReader Return 和使用 IAsyncEnumerable

How to Return and Consume an IAsyncEnumerable with SqlDataReader

请看以下两种方法。第一个returns一个IAsyncEnumerable。第二个尝试消耗它。

using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

public static class SqlUtility
{
    public static async IAsyncEnumerable<IDataRecord> GetRecordsAsync(
        string connectionString, SqlParameter[] parameters, string commandText,
        [EnumeratorCancellation]CancellationToken cancellationToken)
    {
        using (SqlConnection connection = new SqlConnection(connectionString))
        {
            await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
            using (SqlCommand command = new SqlCommand(commandText, connection))
            {
                command.Parameters.AddRange(parameters);
                using (var reader = await command.ExecuteReaderAsync()
                    .ConfigureAwait(false))
                {
                    while (await reader.ReadAsync().ConfigureAwait(false))
                    {
                        yield return reader;
                    }
                }
            }
        }
    }

    public static async Task Example()
    {
        const string connectionString =
            "Server=localhost;Database=[Redacted];Integrated Security=true";
        SqlParameter[] parameters = new SqlParameter[]
        {
            new SqlParameter("VideoID", SqlDbType.Int) { Value = 1000 }
        };
        const string commandText = "select * from Video where VideoID=@VideoID";
        IAsyncEnumerable<IDataRecord> records = GetRecordsAsync(connectionString,
            parameters, commandText, CancellationToken.None);
        IDataRecord firstRecord = await records.FirstAsync().ConfigureAwait(false);
        object videoID = firstRecord["VideoID"]; //Should be 1000.
        // Instead, I get this exception:
        // "Invalid attempt to call MetaData when reader is closed."
    }
}

当代码试图读取结果 IDataReader(在 object videoID = firstRecord["VideoID"];)时,我得到这个异常:

Invalid attempt to call MetaData when reader is closed.

这是因为SqlDataReader被处置了。有人可以提供一种推荐的方法来以异步方式枚举 SqlDataReader 以便每个结果记录都可用于调用方法吗?谢谢。

在这种情况下,LINQ 不是你的朋友,因为 FirstAsync 将在 之前 关闭迭代器 returns 结果,这不是' ADO.NET 所期望的;基本上:不要在这里使用 LINQ,或者至少:不要以这种方式使用。您可以使用 Select 之类的东西在序列仍处于打开状态时 执行投影 ,或者将此处的所有工作卸载到像这样的工具可能更容易小巧玲珑。或者,手动执行:

await foreach (var record in records)
{
    // TODO: process record
    // (perhaps "break"), because you only want the first
}

您可以通过不 returning 依赖于仍处于打开状态的连接的对象来避免这种情况。例如,如果您只需要 VideoID,那么只需 return(我假设它是一个 int):

public static async IAsyncEnumerable<int> GetRecordsAsync(string connectionString, SqlParameter[] parameters, string commandText, [EnumeratorCancellation]CancellationToken cancellationToken)
{
    ...
                    yield return reader["VideoID"];
    ...
}

或者投射到你自己的 class:

public class MyRecord {
    public int VideoId { get; set; }
}

public static async IAsyncEnumerable<MyRecord> GetRecordsAsync(string connectionString, SqlParameter[] parameters, string commandText, [EnumeratorCancellation]CancellationToken cancellationToken)
{
    ...
                    yield return new MyRecord {
                        VideoId = reader["VideoID"]
                    }
    ...
}

或者按照 Marc 的建议,在第一个之后使用 foreachbreak,在您的情况下看起来像这样:

IAsyncEnumerable<IDataRecord> records = GetRecordsAsync(connectionString, parameters, commandText, CancellationToken.None);
object videoID;
await foreach (var record in records)
{
    videoID = record["VideoID"];
    break;
}

当您公开一个打开的 DataReader 时,将其与底层 Connection 一起关闭的责任现在属于调用者,因此您不应处置任何东西。相反,您应该使用 DbCommand.ExecuteReaderAsync overload that accepts a CommandBehavior 参数,并传递 CommandBehavior.CloseConnection 值:

When the command is executed, the associated Connection object is closed when the associated DataReader object is closed.

那么你只能希望调用者遵守规则并及时调用 DataReader.Close 方法,并且在对象被垃圾回收之前不会让连接打开。出于这个原因,公开一个开放的 DataReader 应该被认为是一种极端的性能优化技术,应该谨慎使用。

顺便说一句,如果您返回 IEnumerable<IDataRecord> 而不是 IAsyncEnumerable<IDataRecord>,您也会遇到同样的问题。

要添加到其他答案中,您可以使您的实用程序方法通用并添加一个投影委托,Func<IDataRecord, T> projection,作为这样的参数:

public static async IAsyncEnumerable<T> GetRecordsAsync<T>(
    string connectionString, SqlParameter[] parameters, string commandText,
    Func<IDataRecord, T> projection, // Parameter here
    [EnumeratorCancellation] CancellationToken cancellationToken)
{
    ...
                    yield return projection(reader); // Projected here
    ...
}

然后在调用时传入一个lambda或者引用这样的方法组:

public static object GetVideoId(IDataRecord dataRecord)
    => dataRecord["VideoID"];

这样的:

GetRecordsAsync(connectionString, parameters, commandText, GetVideoId, CancellationToken.None);

在 2021 年底离开这里,我有这个确切的问题。我找不到一个完整的例子,所以我只是乱搞我能找到的东西,直到我找到可以工作的东西。

这是我的完整代码,虽然很简单(所以你可以稍后扩展它)示例,以及一些注释,详细说明我一路走来的一些坑:

// This function turns each "DataRow" into an object of Type T and yields
// it. You could alternately yield the reader itself for each row.
// In this example, assume sqlCommandText and connectionString exist.
public async IAsyncEnumerable<T> ReadAsync<T>( Func<SqlDataReader, T> func )
{
    // we need a connection that will last as long as the reader is open,
    // alternately you could pass in an open connection.
    using SqlConnection connection = new SqlConnection( connectionString );
    using SqlCommand cmd = new SqlCommand( sqlCommandText, connection );

    await connection.OpenAsync();
    var reader = await cmd.ExecuteReaderAsync();
    while( await reader.ReadAsync() )
    {
        yield return func( reader );
    }
}

然后在您的(异步)代码的任何其他部分,您可以在 await foreach 循环中调用您的函数:

private static async Task CallIAsyncEnumerable()
{
    await foreach( var category in ReadAsync( ReaderToCategory ) )
    {
        // do something with your category; save it in a list, write it to disk,
        // make an HTTP call ... the whole world is yours!
    }
}

// an example delegate, which I'm passing into ReadAsync
private static Category ReaderToCategory( SqlDataReader reader )
{
    return new Category()
    {
        Code = ( string )reader[ "Code" ],
        Group = ( string )reader[ "Group" ]
    };
}

我发现的其他一些事情:您不能从 tryyield,但是您可以将 cmd.ExecuteReaderAsync() 之前(包括)的所有内容填充到 try,或 returns DataReader 的单独方法。或者您可以将 await foreach 包装在 try 块中;我认为问题在于在 尝试之外屈服于调用者(在您考虑之后,这是有道理的)。

如果您使用另一种方法生成 reader,将连接传递给该方法,这样您就可以控制它的生命周期。如果您的方法创建连接,执行命令,并且 returns SqlDataReader,连接将关闭(如果您使用了“using”),然后您才能从 reader 中读取。再一次,如果你仔细想想,这是完全有道理的,但它让我绊倒了几分钟。

祝你好运,我希望这对以后的其他人有所帮助!