如何 link 两个希望您提供流的 C# API?
How to link two C# APIs that expect you to provide a stream?
我正在使用两个 C# 流 API,其中之一 is a data source and the other of which is a data sink。
API 都没有真正公开流对象;两者都希望您将流传递给它们,并且它们会处理流中的 writing/reading。
有没有办法 link 这些 API 在一起,这样源的输出就可以流式传输到接收器中,而不必在 MemoryStream 中缓冲整个源?这是一个对 RAM 非常敏感的应用程序。
这是一个使用我试图避免的 MemoryStream 方法的示例,因为它在将整个流写入 S3 之前将其缓冲在 RAM 中:
using (var buffer = new MemoryStream())
using (var transferUtil = new TransferUtility(s3client))
{
// This destructor finishes the file and transferUtil closes
// the stream, so we need this weird using nesting to keep everyone happy.
using (var parquetWriter = new ParquetWriter(schema, buffer))
using (var rowGroupWriter = parquetWriter.CreateRowGroup())
{
rowGroupWriter.WriteColumn(...);
...
}
transferUtil.Upload(buffer, _bucketName, _key.Replace(".gz", "") + ".parquet");
}
您正在寻找可以同时传递给数据源和接收器的流,并且可以 'transfer' 在两者之间异步传输数据。有许多可能的解决方案,我可能考虑过围绕 BlockingCollection 的生产者-消费者模式。
最近,System.IO.Pipelines、Span 和 Memory 类型的添加真正专注于高性能 IO,我认为它非常适合这里。 Pipe class 及其关联的 Reader 和 Writer,可以自动处理它们之间的流量控制、背压和 IO,同时利用所有新的 Span 和 Memory 相关类型。
我已经在 PipeStream 上传了一个要点,它将为您提供一个带有内部 Pipe 实现的自定义流,您可以将其传递给您的 API classes。写入 WriteAsync(或 Write)方法的任何内容都将可供 ReadAsync(或 Read)方法使用,而无需任何进一步的 byte[] 或 MemoryStream 分配
在您的情况下,您只需将 MemoryStream 替换为这个新的 class,它应该开箱即用。我没有进行完整的 S3 测试,但直接从 Parquet 流读取并将其转储到控制台 window 表明它是异步工作的。
// Create some very badly 'mocked' data
var idColumn = new DataColumn(
new DataField<int>("id"),
Enumerable.Range(0, 10000).Select(i => i).ToArray());
var cityColumn = new DataColumn(
new DataField<string>("city"),
Enumerable.Range(0, 10000).Select(i => i % 2 == 0 ? "London" : "Grimsby").ToArray());
var schema = new Schema(idColumn.Field, cityColumn.Field);
using (var pipeStream = new PipeStream())
{
var buffer = new byte[4096];
int read = 0;
var readTask = Task.Run(async () =>
{
//transferUtil.Upload(readStream, "bucketName", "key"); // Execute this in a Task / Thread
while ((read = await pipeStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
var incoming = Encoding.ASCII.GetString(buffer, 0, read);
Console.WriteLine(incoming);
// await Task.Delay(5000); uncomment this to simulate very slow consumer
}
});
using (var parquetWriter = new ParquetWriter(schema, pipeStream)) // This destructor finishes the file and transferUtil closes the stream, so we need this weird using nesting to keep everyone happy.
using (var rowGroupWriter = parquetWriter.CreateRowGroup())
{
rowGroupWriter.WriteColumn(idColumn); // Step through both these statements to see data read before the parquetWriter completes
rowGroupWriter.WriteColumn(cityColumn);
}
}
实施尚未完全完成,但我认为它展示了一种不错的方法。在控制台 'readTask' 中,您可以取消注释 Task.Delay 以模拟慢速读取 (transferUtil),您应该会看到管道自动限制写入任务。
对于其中一种 Span 扩展方法,您需要使用 C# 7.2 或更高版本(VS 2017 -> 项目属性 -> 构建 -> 高级 -> 语言版本),但它应该与任何 .Net 框架兼容。您可能需要 Nuget Package
流是可读和可写的(很明显!)但不可搜索,这在这种情况下应该适合您,但无法从需要可搜索流的 Parquet SDK 读取。
希望对您有所帮助
使用 System.IO.Pipelines
看起来像这样:
var pipe = new System.IO.Pipelines.Pipe();
using (var buffer = pipe.Writer.AsStream())
using (var transferUtil = new TransferUtility(s3client))
{
// we can start the consumer first because it will just block
// on the stream until data is available
Task consumer = transferUtil.UploadAsync(pipe.Reader.AsStream(), _bucketName, _key.Replace(".gz", "") + ".parquet");
// start a task to produce data
Task producer = WriteParquetAsync(buffer, ..);
// start pumping data; we can wait here because the producer will
// necessarily finish before the consumer does
await producer;
// this is key; disposing of the buffer early here causes the consumer stream
// to terminate, else it will just hang waiting on the stream to finish.
// see the documentation for Writer.AsStream(bool leaveOpen = false)
buffer.Dispose();
// wait the upload to finish
await consumer;
}
我正在使用两个 C# 流 API,其中之一 is a data source and the other of which is a data sink。
API 都没有真正公开流对象;两者都希望您将流传递给它们,并且它们会处理流中的 writing/reading。
有没有办法 link 这些 API 在一起,这样源的输出就可以流式传输到接收器中,而不必在 MemoryStream 中缓冲整个源?这是一个对 RAM 非常敏感的应用程序。
这是一个使用我试图避免的 MemoryStream 方法的示例,因为它在将整个流写入 S3 之前将其缓冲在 RAM 中:
using (var buffer = new MemoryStream())
using (var transferUtil = new TransferUtility(s3client))
{
// This destructor finishes the file and transferUtil closes
// the stream, so we need this weird using nesting to keep everyone happy.
using (var parquetWriter = new ParquetWriter(schema, buffer))
using (var rowGroupWriter = parquetWriter.CreateRowGroup())
{
rowGroupWriter.WriteColumn(...);
...
}
transferUtil.Upload(buffer, _bucketName, _key.Replace(".gz", "") + ".parquet");
}
您正在寻找可以同时传递给数据源和接收器的流,并且可以 'transfer' 在两者之间异步传输数据。有许多可能的解决方案,我可能考虑过围绕 BlockingCollection 的生产者-消费者模式。
最近,System.IO.Pipelines、Span 和 Memory 类型的添加真正专注于高性能 IO,我认为它非常适合这里。 Pipe class 及其关联的 Reader 和 Writer,可以自动处理它们之间的流量控制、背压和 IO,同时利用所有新的 Span 和 Memory 相关类型。
我已经在 PipeStream 上传了一个要点,它将为您提供一个带有内部 Pipe 实现的自定义流,您可以将其传递给您的 API classes。写入 WriteAsync(或 Write)方法的任何内容都将可供 ReadAsync(或 Read)方法使用,而无需任何进一步的 byte[] 或 MemoryStream 分配
在您的情况下,您只需将 MemoryStream 替换为这个新的 class,它应该开箱即用。我没有进行完整的 S3 测试,但直接从 Parquet 流读取并将其转储到控制台 window 表明它是异步工作的。
// Create some very badly 'mocked' data
var idColumn = new DataColumn(
new DataField<int>("id"),
Enumerable.Range(0, 10000).Select(i => i).ToArray());
var cityColumn = new DataColumn(
new DataField<string>("city"),
Enumerable.Range(0, 10000).Select(i => i % 2 == 0 ? "London" : "Grimsby").ToArray());
var schema = new Schema(idColumn.Field, cityColumn.Field);
using (var pipeStream = new PipeStream())
{
var buffer = new byte[4096];
int read = 0;
var readTask = Task.Run(async () =>
{
//transferUtil.Upload(readStream, "bucketName", "key"); // Execute this in a Task / Thread
while ((read = await pipeStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
var incoming = Encoding.ASCII.GetString(buffer, 0, read);
Console.WriteLine(incoming);
// await Task.Delay(5000); uncomment this to simulate very slow consumer
}
});
using (var parquetWriter = new ParquetWriter(schema, pipeStream)) // This destructor finishes the file and transferUtil closes the stream, so we need this weird using nesting to keep everyone happy.
using (var rowGroupWriter = parquetWriter.CreateRowGroup())
{
rowGroupWriter.WriteColumn(idColumn); // Step through both these statements to see data read before the parquetWriter completes
rowGroupWriter.WriteColumn(cityColumn);
}
}
实施尚未完全完成,但我认为它展示了一种不错的方法。在控制台 'readTask' 中,您可以取消注释 Task.Delay 以模拟慢速读取 (transferUtil),您应该会看到管道自动限制写入任务。
对于其中一种 Span 扩展方法,您需要使用 C# 7.2 或更高版本(VS 2017 -> 项目属性 -> 构建 -> 高级 -> 语言版本),但它应该与任何 .Net 框架兼容。您可能需要 Nuget Package
流是可读和可写的(很明显!)但不可搜索,这在这种情况下应该适合您,但无法从需要可搜索流的 Parquet SDK 读取。
希望对您有所帮助
使用 System.IO.Pipelines
看起来像这样:
var pipe = new System.IO.Pipelines.Pipe();
using (var buffer = pipe.Writer.AsStream())
using (var transferUtil = new TransferUtility(s3client))
{
// we can start the consumer first because it will just block
// on the stream until data is available
Task consumer = transferUtil.UploadAsync(pipe.Reader.AsStream(), _bucketName, _key.Replace(".gz", "") + ".parquet");
// start a task to produce data
Task producer = WriteParquetAsync(buffer, ..);
// start pumping data; we can wait here because the producer will
// necessarily finish before the consumer does
await producer;
// this is key; disposing of the buffer early here causes the consumer stream
// to terminate, else it will just hang waiting on the stream to finish.
// see the documentation for Writer.AsStream(bool leaveOpen = false)
buffer.Dispose();
// wait the upload to finish
await consumer;
}