哪种方法限制内存使用:SqlReader.GetBytes 或 SqlReader.GetStream 用于大 blob?
Which approach limits memory usage: SqlReader.GetBytes or SqlReader.GetStream for large blobs?
我想确定如何限制从本地数据库检索 blob 并通过块将其传输到第三方 Web 服务的作业中的内存使用。
使用 SqlDataReader,我似乎有两个选择:
- 创建一个使用带有偏移量的 GetBytes 的方法来检索返回 byte[] 的 blob 的一部分。然后该方法的调用者将负责发出网络请求以传输此块。
- 创建一个使用 GetStream 的方法,并向 ReadAsync 发出多个请求以填充字节 [] 缓冲区,并使用此缓冲区发出 Web 请求,直到文档传输完成。
我更喜欢选项 1,因为它限制了方法的责任,但是如果我调用带有偏移量的 GetBytes,它会将整个偏移量加载到内存中还是 sql 服务器只能返回请求的小块?
如果我使用选项 2,那么该方法将有两个职责,从数据库加载一个块并发出 Web 请求以将文档存储在其他地方。
// option 1
public async Task<Tuple<int, byte[]>> GetDocumentChunk(int documentId, int offset, int maxChunkSize)
{
var buffer = new byte[maxChunkSize];
string sql = "SELECT Data FROM Document WHERE Id = @Id";
using (SqlConnection connection = new SqlConnection(ConnectionString))
{
await connection.OpenAsync();
using (SqlCommand command = new SqlCommand(sql, connection))
{
command.Parameters.AddWithValue("@Id", documentId);
using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess))
{
if (await reader.ReadAsync())
{
int bytesRead = (int)reader.GetBytes(0, offset, buffer, 0, maxChunkSize);
return new Tuple<int, byte[]>(bytesRead, buffer);
}
}
}
}
return new Tuple<int, byte[]>(0, buffer);
}
//option 2
public async Task<CallResult> TransferDocument(int documentId, int maxChunkSize)
{
var buffer = new byte[maxChunkSize];
string sql = "SELECT Data FROM Document WHERE Id = @Id";
using (SqlConnection connection = new SqlConnection(ConnectionString))
{
await connection.OpenAsync();
using (SqlCommand command = new SqlCommand(sql, connection))
{
command.Parameters.AddWithValue("@Id", documentId);
using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess))
{
using (Stream uploadDataStream = reader.GetStream(0))
{
CallResult callResult;
int bytesRead;
do
{
bytesRead = await uploadDataStream.ReadAsync(buffer, 0, maxChunkSize);
callResult = await MyWebRequest(documentId, buffer, bytesRead);
if (callResult != CallResult.Success)
{
return callResult;
}
} while (bytesRead > 0);
return callResult;
}
}
}
}
}
使用选项 1,您将向源发出许多请求以获取数据,并且 GetBytes
不会 'search' 在 SQL 服务器上流式传输(如果它确实如此),那将是一个非常低效的解决方案。
IAsyncEnumerable
使用选项 2,您可以获得流并按需处理它,因此您将发出单个数据库请求并将获得异步的所有好处 I/O。
用C# 8
IAsyncEnumerable 可以完美解决你的问题,但目前处于Preview
阶段。
CopyToAsync
如果您可以获得需要上传内容的流,那么您可以在上面使用 CopyToAsync. But I assume that each chunk will be uploaded in the individual request. If so, you may introduce a component which will quack like a Stream
but will actually upload content to the website when DB stream calls CopyToAsync():
class WebSiteChunkUploader : Stream
{
private HttpClient _client = new HttpClient();
public override bool CanWrite => true;
public override bool CanRead => false;
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
await _client.PostAsync("localhost", new ByteArrayContent(buffer,offset, count));
}
旧的好 IEnumerable
遗憾的是,您不能将 IEnumerable
中的 yield return
与 async/await
混合使用。但是,如果您决定使用阻塞 api 读取流,例如 Read
,那么您可以使用旧的 yield return
:
重写它
public IEnumerable<Tuple<byte[],int>> TransferDocument(int documentId, int maxChunkSize)
{
string sql = "SELECT Data FROM Document WHERE Id = @Id";
var buffer = new byte[maxChunkSize];
using (SqlConnection connection = new SqlConnection(ConnectionString))
{
connection.Open();
using (SqlCommand command = new SqlCommand(sql, connection))
{
command.Parameters.AddWithValue("@Id", documentId);
using (SqlDataReader reader = command.ExecuteReader(CommandBehavior.SequentialAccess))
using (Stream uploadDataStream = reader.GetStream(0))
{
while(var bytesRead = uploadDataStream.Read(buffer, 0, maxChunkSize)) > 0)
yield return Tuple(buffer, bytesRead);
}
}
}
}
...
async Task DoMyTransfer()
{
foreach(var buffer in TransferDocument(1, 10000)) {
await moveBytes(buffer)
}
}
在这种情况下,您不会有与数据库的异步 IO 和奇特的 Tasks
,但我想您无论如何都需要限制此上传操作,以免数据库因连接而过载。
我想确定如何限制从本地数据库检索 blob 并通过块将其传输到第三方 Web 服务的作业中的内存使用。
使用 SqlDataReader,我似乎有两个选择:
- 创建一个使用带有偏移量的 GetBytes 的方法来检索返回 byte[] 的 blob 的一部分。然后该方法的调用者将负责发出网络请求以传输此块。
- 创建一个使用 GetStream 的方法,并向 ReadAsync 发出多个请求以填充字节 [] 缓冲区,并使用此缓冲区发出 Web 请求,直到文档传输完成。
我更喜欢选项 1,因为它限制了方法的责任,但是如果我调用带有偏移量的 GetBytes,它会将整个偏移量加载到内存中还是 sql 服务器只能返回请求的小块? 如果我使用选项 2,那么该方法将有两个职责,从数据库加载一个块并发出 Web 请求以将文档存储在其他地方。
// option 1
public async Task<Tuple<int, byte[]>> GetDocumentChunk(int documentId, int offset, int maxChunkSize)
{
var buffer = new byte[maxChunkSize];
string sql = "SELECT Data FROM Document WHERE Id = @Id";
using (SqlConnection connection = new SqlConnection(ConnectionString))
{
await connection.OpenAsync();
using (SqlCommand command = new SqlCommand(sql, connection))
{
command.Parameters.AddWithValue("@Id", documentId);
using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess))
{
if (await reader.ReadAsync())
{
int bytesRead = (int)reader.GetBytes(0, offset, buffer, 0, maxChunkSize);
return new Tuple<int, byte[]>(bytesRead, buffer);
}
}
}
}
return new Tuple<int, byte[]>(0, buffer);
}
//option 2
public async Task<CallResult> TransferDocument(int documentId, int maxChunkSize)
{
var buffer = new byte[maxChunkSize];
string sql = "SELECT Data FROM Document WHERE Id = @Id";
using (SqlConnection connection = new SqlConnection(ConnectionString))
{
await connection.OpenAsync();
using (SqlCommand command = new SqlCommand(sql, connection))
{
command.Parameters.AddWithValue("@Id", documentId);
using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess))
{
using (Stream uploadDataStream = reader.GetStream(0))
{
CallResult callResult;
int bytesRead;
do
{
bytesRead = await uploadDataStream.ReadAsync(buffer, 0, maxChunkSize);
callResult = await MyWebRequest(documentId, buffer, bytesRead);
if (callResult != CallResult.Success)
{
return callResult;
}
} while (bytesRead > 0);
return callResult;
}
}
}
}
}
使用选项 1,您将向源发出许多请求以获取数据,并且 GetBytes
不会 'search' 在 SQL 服务器上流式传输(如果它确实如此),那将是一个非常低效的解决方案。
IAsyncEnumerable
使用选项 2,您可以获得流并按需处理它,因此您将发出单个数据库请求并将获得异步的所有好处 I/O。
用C# 8
IAsyncEnumerable 可以完美解决你的问题,但目前处于Preview
阶段。
CopyToAsync
如果您可以获得需要上传内容的流,那么您可以在上面使用 CopyToAsync. But I assume that each chunk will be uploaded in the individual request. If so, you may introduce a component which will quack like a Stream
but will actually upload content to the website when DB stream calls CopyToAsync():
class WebSiteChunkUploader : Stream
{
private HttpClient _client = new HttpClient();
public override bool CanWrite => true;
public override bool CanRead => false;
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
await _client.PostAsync("localhost", new ByteArrayContent(buffer,offset, count));
}
旧的好 IEnumerable
遗憾的是,您不能将 IEnumerable
中的 yield return
与 async/await
混合使用。但是,如果您决定使用阻塞 api 读取流,例如 Read
,那么您可以使用旧的 yield return
:
public IEnumerable<Tuple<byte[],int>> TransferDocument(int documentId, int maxChunkSize)
{
string sql = "SELECT Data FROM Document WHERE Id = @Id";
var buffer = new byte[maxChunkSize];
using (SqlConnection connection = new SqlConnection(ConnectionString))
{
connection.Open();
using (SqlCommand command = new SqlCommand(sql, connection))
{
command.Parameters.AddWithValue("@Id", documentId);
using (SqlDataReader reader = command.ExecuteReader(CommandBehavior.SequentialAccess))
using (Stream uploadDataStream = reader.GetStream(0))
{
while(var bytesRead = uploadDataStream.Read(buffer, 0, maxChunkSize)) > 0)
yield return Tuple(buffer, bytesRead);
}
}
}
}
...
async Task DoMyTransfer()
{
foreach(var buffer in TransferDocument(1, 10000)) {
await moveBytes(buffer)
}
}
在这种情况下,您不会有与数据库的异步 IO 和奇特的 Tasks
,但我想您无论如何都需要限制此上传操作,以免数据库因连接而过载。