如何使用 SqlDataReader Return 和使用 IAsyncEnumerable
How to Return and Consume an IAsyncEnumerable with SqlDataReader
请看以下两种方法。第一个returns一个IAsyncEnumerable
。第二个尝试消耗它。
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
public static class SqlUtility
{
public static async IAsyncEnumerable<IDataRecord> GetRecordsAsync(
string connectionString, SqlParameter[] parameters, string commandText,
[EnumeratorCancellation]CancellationToken cancellationToken)
{
using (SqlConnection connection = new SqlConnection(connectionString))
{
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
using (SqlCommand command = new SqlCommand(commandText, connection))
{
command.Parameters.AddRange(parameters);
using (var reader = await command.ExecuteReaderAsync()
.ConfigureAwait(false))
{
while (await reader.ReadAsync().ConfigureAwait(false))
{
yield return reader;
}
}
}
}
}
public static async Task Example()
{
const string connectionString =
"Server=localhost;Database=[Redacted];Integrated Security=true";
SqlParameter[] parameters = new SqlParameter[]
{
new SqlParameter("VideoID", SqlDbType.Int) { Value = 1000 }
};
const string commandText = "select * from Video where VideoID=@VideoID";
IAsyncEnumerable<IDataRecord> records = GetRecordsAsync(connectionString,
parameters, commandText, CancellationToken.None);
IDataRecord firstRecord = await records.FirstAsync().ConfigureAwait(false);
object videoID = firstRecord["VideoID"]; //Should be 1000.
// Instead, I get this exception:
// "Invalid attempt to call MetaData when reader is closed."
}
}
当代码试图读取结果 IDataReader
(在 object videoID = firstRecord["VideoID"];
)时,我得到这个异常:
Invalid attempt to call MetaData when reader is closed.
这是因为SqlDataReader
被处置了。有人可以提供一种推荐的方法来以异步方式枚举 SqlDataReader
以便每个结果记录都可用于调用方法吗?谢谢。
在这种情况下,LINQ 不是你的朋友,因为 FirstAsync
将在 之前 关闭迭代器 returns 结果,这不是' ADO.NET 所期望的;基本上:不要在这里使用 LINQ,或者至少:不要以这种方式使用。您可以使用 Select
之类的东西在序列仍处于打开状态时 执行投影 ,或者将此处的所有工作卸载到像这样的工具可能更容易小巧玲珑。或者,手动执行:
await foreach (var record in records)
{
// TODO: process record
// (perhaps "break"), because you only want the first
}
您可以通过不 returning 依赖于仍处于打开状态的连接的对象来避免这种情况。例如,如果您只需要 VideoID
,那么只需 return(我假设它是一个 int
):
public static async IAsyncEnumerable<int> GetRecordsAsync(string connectionString, SqlParameter[] parameters, string commandText, [EnumeratorCancellation]CancellationToken cancellationToken)
{
...
yield return reader["VideoID"];
...
}
或者投射到你自己的 class:
public class MyRecord {
public int VideoId { get; set; }
}
public static async IAsyncEnumerable<MyRecord> GetRecordsAsync(string connectionString, SqlParameter[] parameters, string commandText, [EnumeratorCancellation]CancellationToken cancellationToken)
{
...
yield return new MyRecord {
VideoId = reader["VideoID"]
}
...
}
或者按照 Marc 的建议,在第一个之后使用 foreach
和 break
,在您的情况下看起来像这样:
IAsyncEnumerable<IDataRecord> records = GetRecordsAsync(connectionString, parameters, commandText, CancellationToken.None);
object videoID;
await foreach (var record in records)
{
videoID = record["VideoID"];
break;
}
当您公开一个打开的 DataReader
时,将其与底层 Connection
一起关闭的责任现在属于调用者,因此您不应处置任何东西。相反,您应该使用 DbCommand.ExecuteReaderAsync
overload that accepts a CommandBehavior
参数,并传递 CommandBehavior.CloseConnection
值:
When the command is executed, the associated Connection object is closed when the associated DataReader object is closed.
那么你只能希望调用者遵守规则并及时调用 DataReader.Close
方法,并且在对象被垃圾回收之前不会让连接打开。出于这个原因,公开一个开放的 DataReader
应该被认为是一种极端的性能优化技术,应该谨慎使用。
顺便说一句,如果您返回 IEnumerable<IDataRecord>
而不是 IAsyncEnumerable<IDataRecord>
,您也会遇到同样的问题。
要添加到其他答案中,您可以使您的实用程序方法通用并添加一个投影委托,Func<IDataRecord, T> projection
,作为这样的参数:
public static async IAsyncEnumerable<T> GetRecordsAsync<T>(
string connectionString, SqlParameter[] parameters, string commandText,
Func<IDataRecord, T> projection, // Parameter here
[EnumeratorCancellation] CancellationToken cancellationToken)
{
...
yield return projection(reader); // Projected here
...
}
然后在调用时传入一个lambda或者引用这样的方法组:
public static object GetVideoId(IDataRecord dataRecord)
=> dataRecord["VideoID"];
这样的:
GetRecordsAsync(connectionString, parameters, commandText, GetVideoId, CancellationToken.None);
在 2021 年底离开这里,我有这个确切的问题。我找不到一个完整的例子,所以我只是乱搞我能找到的东西,直到我找到可以工作的东西。
这是我的完整代码,虽然很简单(所以你可以稍后扩展它)示例,以及一些注释,详细说明我一路走来的一些坑:
// This function turns each "DataRow" into an object of Type T and yields
// it. You could alternately yield the reader itself for each row.
// In this example, assume sqlCommandText and connectionString exist.
public async IAsyncEnumerable<T> ReadAsync<T>( Func<SqlDataReader, T> func )
{
// we need a connection that will last as long as the reader is open,
// alternately you could pass in an open connection.
using SqlConnection connection = new SqlConnection( connectionString );
using SqlCommand cmd = new SqlCommand( sqlCommandText, connection );
await connection.OpenAsync();
var reader = await cmd.ExecuteReaderAsync();
while( await reader.ReadAsync() )
{
yield return func( reader );
}
}
然后在您的(异步)代码的任何其他部分,您可以在 await foreach
循环中调用您的函数:
private static async Task CallIAsyncEnumerable()
{
await foreach( var category in ReadAsync( ReaderToCategory ) )
{
// do something with your category; save it in a list, write it to disk,
// make an HTTP call ... the whole world is yours!
}
}
// an example delegate, which I'm passing into ReadAsync
private static Category ReaderToCategory( SqlDataReader reader )
{
return new Category()
{
Code = ( string )reader[ "Code" ],
Group = ( string )reader[ "Group" ]
};
}
我发现的其他一些事情:您不能从 try
中 yield
,但是您可以将 cmd.ExecuteReaderAsync()
之前(包括)的所有内容填充到 try
,或 returns DataReader 的单独方法。或者您可以将 await foreach
包装在 try
块中;我认为问题在于在 尝试之外屈服于调用者(在您考虑之后,这是有道理的)。
如果您使用另一种方法生成 reader,将连接传递给该方法,这样您就可以控制它的生命周期。如果您的方法创建连接,执行命令,并且 returns SqlDataReader
,连接将关闭(如果您使用了“using”),然后您才能从 reader 中读取。再一次,如果你仔细想想,这是完全有道理的,但它让我绊倒了几分钟。
祝你好运,我希望这对以后的其他人有所帮助!
请看以下两种方法。第一个returns一个IAsyncEnumerable
。第二个尝试消耗它。
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
public static class SqlUtility
{
public static async IAsyncEnumerable<IDataRecord> GetRecordsAsync(
string connectionString, SqlParameter[] parameters, string commandText,
[EnumeratorCancellation]CancellationToken cancellationToken)
{
using (SqlConnection connection = new SqlConnection(connectionString))
{
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
using (SqlCommand command = new SqlCommand(commandText, connection))
{
command.Parameters.AddRange(parameters);
using (var reader = await command.ExecuteReaderAsync()
.ConfigureAwait(false))
{
while (await reader.ReadAsync().ConfigureAwait(false))
{
yield return reader;
}
}
}
}
}
public static async Task Example()
{
const string connectionString =
"Server=localhost;Database=[Redacted];Integrated Security=true";
SqlParameter[] parameters = new SqlParameter[]
{
new SqlParameter("VideoID", SqlDbType.Int) { Value = 1000 }
};
const string commandText = "select * from Video where VideoID=@VideoID";
IAsyncEnumerable<IDataRecord> records = GetRecordsAsync(connectionString,
parameters, commandText, CancellationToken.None);
IDataRecord firstRecord = await records.FirstAsync().ConfigureAwait(false);
object videoID = firstRecord["VideoID"]; //Should be 1000.
// Instead, I get this exception:
// "Invalid attempt to call MetaData when reader is closed."
}
}
当代码试图读取结果 IDataReader
(在 object videoID = firstRecord["VideoID"];
)时,我得到这个异常:
Invalid attempt to call MetaData when reader is closed.
这是因为SqlDataReader
被处置了。有人可以提供一种推荐的方法来以异步方式枚举 SqlDataReader
以便每个结果记录都可用于调用方法吗?谢谢。
在这种情况下,LINQ 不是你的朋友,因为 FirstAsync
将在 之前 关闭迭代器 returns 结果,这不是' ADO.NET 所期望的;基本上:不要在这里使用 LINQ,或者至少:不要以这种方式使用。您可以使用 Select
之类的东西在序列仍处于打开状态时 执行投影 ,或者将此处的所有工作卸载到像这样的工具可能更容易小巧玲珑。或者,手动执行:
await foreach (var record in records)
{
// TODO: process record
// (perhaps "break"), because you only want the first
}
您可以通过不 returning 依赖于仍处于打开状态的连接的对象来避免这种情况。例如,如果您只需要 VideoID
,那么只需 return(我假设它是一个 int
):
public static async IAsyncEnumerable<int> GetRecordsAsync(string connectionString, SqlParameter[] parameters, string commandText, [EnumeratorCancellation]CancellationToken cancellationToken)
{
...
yield return reader["VideoID"];
...
}
或者投射到你自己的 class:
public class MyRecord {
public int VideoId { get; set; }
}
public static async IAsyncEnumerable<MyRecord> GetRecordsAsync(string connectionString, SqlParameter[] parameters, string commandText, [EnumeratorCancellation]CancellationToken cancellationToken)
{
...
yield return new MyRecord {
VideoId = reader["VideoID"]
}
...
}
或者按照 Marc 的建议,在第一个之后使用 foreach
和 break
,在您的情况下看起来像这样:
IAsyncEnumerable<IDataRecord> records = GetRecordsAsync(connectionString, parameters, commandText, CancellationToken.None);
object videoID;
await foreach (var record in records)
{
videoID = record["VideoID"];
break;
}
当您公开一个打开的 DataReader
时,将其与底层 Connection
一起关闭的责任现在属于调用者,因此您不应处置任何东西。相反,您应该使用 DbCommand.ExecuteReaderAsync
overload that accepts a CommandBehavior
参数,并传递 CommandBehavior.CloseConnection
值:
When the command is executed, the associated Connection object is closed when the associated DataReader object is closed.
那么你只能希望调用者遵守规则并及时调用 DataReader.Close
方法,并且在对象被垃圾回收之前不会让连接打开。出于这个原因,公开一个开放的 DataReader
应该被认为是一种极端的性能优化技术,应该谨慎使用。
顺便说一句,如果您返回 IEnumerable<IDataRecord>
而不是 IAsyncEnumerable<IDataRecord>
,您也会遇到同样的问题。
要添加到其他答案中,您可以使您的实用程序方法通用并添加一个投影委托,Func<IDataRecord, T> projection
,作为这样的参数:
public static async IAsyncEnumerable<T> GetRecordsAsync<T>(
string connectionString, SqlParameter[] parameters, string commandText,
Func<IDataRecord, T> projection, // Parameter here
[EnumeratorCancellation] CancellationToken cancellationToken)
{
...
yield return projection(reader); // Projected here
...
}
然后在调用时传入一个lambda或者引用这样的方法组:
public static object GetVideoId(IDataRecord dataRecord)
=> dataRecord["VideoID"];
这样的:
GetRecordsAsync(connectionString, parameters, commandText, GetVideoId, CancellationToken.None);
在 2021 年底离开这里,我有这个确切的问题。我找不到一个完整的例子,所以我只是乱搞我能找到的东西,直到我找到可以工作的东西。
这是我的完整代码,虽然很简单(所以你可以稍后扩展它)示例,以及一些注释,详细说明我一路走来的一些坑:
// This function turns each "DataRow" into an object of Type T and yields
// it. You could alternately yield the reader itself for each row.
// In this example, assume sqlCommandText and connectionString exist.
public async IAsyncEnumerable<T> ReadAsync<T>( Func<SqlDataReader, T> func )
{
// we need a connection that will last as long as the reader is open,
// alternately you could pass in an open connection.
using SqlConnection connection = new SqlConnection( connectionString );
using SqlCommand cmd = new SqlCommand( sqlCommandText, connection );
await connection.OpenAsync();
var reader = await cmd.ExecuteReaderAsync();
while( await reader.ReadAsync() )
{
yield return func( reader );
}
}
然后在您的(异步)代码的任何其他部分,您可以在 await foreach
循环中调用您的函数:
private static async Task CallIAsyncEnumerable()
{
await foreach( var category in ReadAsync( ReaderToCategory ) )
{
// do something with your category; save it in a list, write it to disk,
// make an HTTP call ... the whole world is yours!
}
}
// an example delegate, which I'm passing into ReadAsync
private static Category ReaderToCategory( SqlDataReader reader )
{
return new Category()
{
Code = ( string )reader[ "Code" ],
Group = ( string )reader[ "Group" ]
};
}
我发现的其他一些事情:您不能从 try
中 yield
,但是您可以将 cmd.ExecuteReaderAsync()
之前(包括)的所有内容填充到 try
,或 returns DataReader 的单独方法。或者您可以将 await foreach
包装在 try
块中;我认为问题在于在 尝试之外屈服于调用者(在您考虑之后,这是有道理的)。
如果您使用另一种方法生成 reader,将连接传递给该方法,这样您就可以控制它的生命周期。如果您的方法创建连接,执行命令,并且 returns SqlDataReader
,连接将关闭(如果您使用了“using”),然后您才能从 reader 中读取。再一次,如果你仔细想想,这是完全有道理的,但它让我绊倒了几分钟。
祝你好运,我希望这对以后的其他人有所帮助!