如何 link 两个希望您提供流的 C# API?

How to link two C# APIs that expect you to provide a stream?

我正在使用两个 C# 流 API,其中之一 is a data source and the other of which is a data sink

API 都没有真正公开流对象;两者都希望您将流传递给它们,并且它们会处理流中的 writing/reading。

有没有办法 link 这些 API 在一起,这样源的输出就可以流式传输到接收器中,而不必在 MemoryStream 中缓冲整个源?这是一个对 RAM 非常敏感的应用程序。

这是一个使用我试图避免的 MemoryStream 方法的示例,因为它在将整个流写入 S3 之前将其缓冲在 RAM 中:

using (var buffer = new MemoryStream())
using (var transferUtil = new TransferUtility(s3client))
{
    // This destructor finishes the file and transferUtil closes 
    // the stream, so we need this weird using nesting to keep everyone happy.
    using (var parquetWriter = new ParquetWriter(schema, buffer)) 
        using (var rowGroupWriter = parquetWriter.CreateRowGroup())
        {
            rowGroupWriter.WriteColumn(...);
            ...
        }
    transferUtil.Upload(buffer, _bucketName, _key.Replace(".gz", "") + ".parquet");
}

您正在寻找可以同时传递给数据源和接收器的流,并且可以 'transfer' 在两者之间异步传输数据。有许多可能的解决方案,我可能考虑过围绕 BlockingCollection 的生产者-消费者模式。

最近,System.IO.Pipelines、Span 和 Memory 类型的添加真正专注于高性能 IO,我认为它非常适合这里。 Pipe class 及其关联的 Reader 和 Writer,可以自动处理它们之间的流量控制、背压和 IO,同时利用所有新的 Span 和 Memory 相关类型。

我已经在 PipeStream 上传了一个要点,它将为您提供一个带有内部 Pipe 实现的自定义流,您可以将其传递给您的 API classes。写入 WriteAsync(或 Write)方法的任何内容都将可供 ReadAsync(或 Read)方法使用,而无需任何进一步的 byte[] 或 MemoryStream 分配

在您的情况下,您只需将 MemoryStream 替换为这个新的 class,它应该开箱即用。我没有进行完整的 S3 测试,但直接从 Parquet 流读取并将其转储到控制台 window 表明它是异步工​​作的。

// Create some very badly 'mocked' data
var idColumn = new DataColumn(
    new DataField<int>("id"),
    Enumerable.Range(0, 10000).Select(i => i).ToArray());

var cityColumn = new DataColumn(
    new DataField<string>("city"),
    Enumerable.Range(0, 10000).Select(i => i % 2 == 0 ? "London" : "Grimsby").ToArray());

var schema = new Schema(idColumn.Field, cityColumn.Field);

using (var pipeStream = new PipeStream())
{
    var buffer = new byte[4096];
    int read = 0;

    var readTask = Task.Run(async () =>
    {
        //transferUtil.Upload(readStream, "bucketName", "key"); // Execute this in a Task / Thread 
        while ((read = await pipeStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
        {
            var incoming = Encoding.ASCII.GetString(buffer, 0, read);
            Console.WriteLine(incoming);
            // await Task.Delay(5000); uncomment this to simulate very slow consumer
        }
    });

    using (var parquetWriter = new ParquetWriter(schema, pipeStream)) // This destructor finishes the file and transferUtil closes the stream, so we need this weird using nesting to keep everyone happy.
    using (var rowGroupWriter = parquetWriter.CreateRowGroup())
    {
        rowGroupWriter.WriteColumn(idColumn);  // Step through both these statements to see data read before the parquetWriter completes
        rowGroupWriter.WriteColumn(cityColumn);
    }       
}

实施尚未完全完成,但我认为它展示了一种不错的方法。在控制台 'readTask' 中,您可以取消注释 Task.Delay 以模拟慢速读取 (transferUtil),您应该会看到管道自动限制写入任务。

对于其中一种 Span 扩展方法,您需要使用 C# 7.2 或更高版本(VS 2017 -> 项目属性 -> 构建 -> 高级 -> 语言版本),但它应该与任何 .Net 框架兼容。您可能需要 Nuget Package

流是可读和可写的(很明显!)但不可搜索,这在这种情况下应该适合您,但无法从需要可搜索流的 Parquet SDK 读取。

希望对您有所帮助

使用 System.IO.Pipelines 看起来像这样:

var pipe = new System.IO.Pipelines.Pipe();
using (var buffer = pipe.Writer.AsStream())
using (var transferUtil = new TransferUtility(s3client))
{
    // we can start the consumer first because it will just block 
    // on the stream until data is available
    Task consumer = transferUtil.UploadAsync(pipe.Reader.AsStream(), _bucketName, _key.Replace(".gz", "") + ".parquet");
    // start a task to produce data
    Task producer = WriteParquetAsync(buffer, ..);
    // start pumping data; we can wait here because the producer will
    // necessarily finish before the consumer does
    await producer;
    // this is key; disposing of the buffer early here causes the consumer stream
    // to terminate, else it will just hang waiting on the stream to finish.
    // see the documentation for Writer.AsStream(bool leaveOpen = false)
    buffer.Dispose();
    // wait the upload to finish
    await consumer;

}