哪种方法限制内存使用:SqlReader.GetBytes 或 SqlReader.GetStream 用于大 blob?

Which approach limits memory usage: SqlReader.GetBytes or SqlReader.GetStream for large blobs?

我想确定如何限制从本地数据库检索 blob 并通过块将其传输到第三方 Web 服务的作业中的内存使用。

使用 SqlDataReader,我似乎有两个选择:

  1. 创建一个使用带有偏移量的 GetBytes 的方法来检索返回 byte[] 的 blob 的一部分。然后该方法的调用者将负责发出网络请求以传输此块。
  2. 创建一个使用 GetStream 的方法,并向 ReadAsync 发出多个请求以填充字节 [] 缓冲区,并使用此缓冲区发出 Web 请求,直到文档传输完成。

我更喜欢选项 1,因为它限制了方法的责任,但是如果我调用带有偏移量的 GetBytes,它会将整个偏移量加载到内存中还是 sql 服务器只能返回请求的小块? 如果我使用选项 2,那么该方法将有两个职责,从数据库加载一个块并发出 Web 请求以将文档存储在其他地方。

// option 1
public async Task<Tuple<int, byte[]>> GetDocumentChunk(int documentId, int offset, int maxChunkSize)
{
    var buffer = new byte[maxChunkSize];

    string sql = "SELECT Data FROM Document WHERE Id = @Id";

    using (SqlConnection connection = new SqlConnection(ConnectionString))
    {
        await connection.OpenAsync();

        using (SqlCommand command = new SqlCommand(sql, connection))
        {
            command.Parameters.AddWithValue("@Id", documentId);

            using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess))
            {
                if (await reader.ReadAsync())
                {
                    int bytesRead = (int)reader.GetBytes(0, offset, buffer, 0, maxChunkSize);
                    return new Tuple<int, byte[]>(bytesRead, buffer);
                }
            }
        }
    }

    return new Tuple<int, byte[]>(0, buffer);
}

//option 2
public async Task<CallResult> TransferDocument(int documentId, int maxChunkSize)
{
    var buffer = new byte[maxChunkSize];

    string sql = "SELECT Data FROM Document WHERE Id = @Id";

    using (SqlConnection connection = new SqlConnection(ConnectionString))
    {
        await connection.OpenAsync();

        using (SqlCommand command = new SqlCommand(sql, connection))
        {
            command.Parameters.AddWithValue("@Id", documentId);

            using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess))
            {
                using (Stream uploadDataStream = reader.GetStream(0))
                {
                    CallResult callResult;
                    int bytesRead;
                    do
                    {
                        bytesRead = await uploadDataStream.ReadAsync(buffer, 0, maxChunkSize);
                        callResult = await MyWebRequest(documentId, buffer, bytesRead);
                        if (callResult != CallResult.Success)
                        {
                            return callResult;
                        }
                    } while (bytesRead > 0);

                    return callResult;
                }
            }
        }
    }
}

使用选项 1,您将向源发出许多请求以获取数据,并且 GetBytes 不会 'search' 在 SQL 服务器上流式传输(如果它确实如此),那将是一个非常低效的解决方案。

IAsyncEnumerable

使用选项 2,您可以获得流并按需处理它,因此您将发出单个数据库请求并将获得异步的所有好处 I/O。

C# 8 IAsyncEnumerable 可以完美解决你的问题,但目前处于Preview阶段。

CopyToAsync

如果您可以获得需要上传内容的流,那么您可以在上面使用 CopyToAsync. But I assume that each chunk will be uploaded in the individual request. If so, you may introduce a component which will quack like a Stream but will actually upload content to the website when DB stream calls CopyToAsync()

class WebSiteChunkUploader : Stream
{
     private HttpClient _client = new HttpClient();
     public override bool CanWrite => true;
     public override bool CanRead => false;

     public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>

         await _client.PostAsync("localhost", new ByteArrayContent(buffer,offset, count));
}

旧的好 IEnumerable

遗憾的是,您不能将 IEnumerable 中的 yield returnasync/await 混合使用。但是,如果您决定使用阻塞 api 读取流,例如 Read,那么您可以使用旧的 yield return:

重写它
public IEnumerable<Tuple<byte[],int>> TransferDocument(int documentId, int maxChunkSize)
{
    string sql = "SELECT Data FROM Document WHERE Id = @Id";
    var buffer = new byte[maxChunkSize];
    using (SqlConnection connection = new SqlConnection(ConnectionString))
    {
        connection.Open();
        using (SqlCommand command = new SqlCommand(sql, connection))
        {
            command.Parameters.AddWithValue("@Id", documentId);
            using (SqlDataReader reader = command.ExecuteReader(CommandBehavior.SequentialAccess))
            using (Stream uploadDataStream = reader.GetStream(0))
            {
                while(var bytesRead = uploadDataStream.Read(buffer, 0, maxChunkSize)) > 0)
                   yield return Tuple(buffer, bytesRead);
            }
        }
    }
}

...
async Task DoMyTransfer() 
{
  foreach(var buffer in TransferDocument(1, 10000)) {
    await moveBytes(buffer)
  }
}

在这种情况下,您不会有与数据库的异步 IO 和奇特的 Tasks,但我想您无论如何都需要限制此上传操作,以免数据库因连接而过载。