如何创建可以转换流的流包装流

How to create a stream wrapping stream that can transform a stream

var incomingStream = ...
var outgoingStream = ...

await incomingStream.CopyToAsync(outgoingStream);

上面的代码很简单,将传入流复制到传出流。两个流都被分块传输 coming/going 通过 interet。

现在,假设我想用 Func<Stream,Stream,Task> 之类的东西转换流,如果不读取所有数据,我将如何做到这一点。

因为我可以做到

var ms = new MemoryStream();
incomingStream.CopyTo(ms);

--- do transform of streams and seek
ms.CopyTo(outgoingStream)

但这会把漏洞读入 ms,是否有任何内置的东西允许我从传入流中读取并写入一个新流,它不会缓冲所有内容,而是只保留一个小的内部流缓冲数据,并且在数据再次被拉出之前它不会从传入流中读取。

我想做的是:

    protected async Task XmlToJsonStream(Stream instream, Stream outStream)
    {
        XmlReaderSettings readerSettings = new XmlReaderSettings();
        readerSettings.IgnoreWhitespace = false;
        var reader = XmlReader.Create(instream, readerSettings);
        var jsonWriter = new JsonTextWriter(new StreamWriter(outStream));
        jsonWriter.WriteStartObject();

        while (await reader.ReadAsync())
        {
            jsonWriter.writeReader(reader);
        }
        jsonWriter.WriteEndObject();
        jsonWriter.Flush();
    }
    protected async Task XmlFilterStream(Stream instream, Stream outStream)
    {
        XmlReaderSettings readerSettings = new XmlReaderSettings();
        readerSettings.IgnoreWhitespace = false;
        var reader = XmlReader.Create(instream, readerSettings);
        var writer = XmlWriter.Create(outStream, new XmlWriterSettings { Async = true, CloseOutput = false })

        while (reader.Read())
        {
            writer.writeReader(reader);
        }


    }

但是我不知道怎么连接。

var incomingStream = ...
var outgoingStream = ...
var temp=...  
XmlFilterStream(incomingStream,temp);
XmlToJsonStream(temp,outgoingstream);

因为如果我使用 MemoryStream 作为临时文件,它会不会在最后将它全部存储在流中。寻找读取数据后再次丢弃数据的流。

以上所有只是示例代码,缺少一些处理和查找原因,但我希望我能够说明我要做什么。能够根据设置在仅复制流、进行 xml 过滤和可选的将其转换为 json.

之间即插即用

流是 字节 的序列,因此流转换类似于 Func<ArraySegment<byte>, ArraySegment<byte>>。然后您可以以流方式应用它:

async Task TransformAsync(this Stream source, Func<ArraySegment<byte>, ArraySegment<byte>> transform, Stream destination, int bufferSize = 1024)
{
  var buffer = new byte[bufferSize];
  while (true)
  {
    var bytesRead = await source.ReadAsync(buffer, 0, bufferSize);
    if (bytesRead == 0)
      return;
    var bytesToWrite = transform(new ArraySegment(buffer, 0, bytesRead));
    if (bytesToWrite.Count != 0)
      await destination.WriteAsync(bytesToWrite.Buffer, bytesToWrite.Offset, bytesToWrite.Count);
  }
}

它比那要复杂一点,但这是一般的想法。它需要一些逻辑来确保 WriteAsync 写入所有字节;除了 transform 方法之外,通常还需要一个 "flush" 方法,该方法在源流完成时调用,因此转换算法有最后机会 return 它的最终结果要写入输出流的数据。

如果您想要 其他 类型的流,例如 XML 或 JSON 类型,那么您最好使用 Reactive Extensions.

我不确定我是否完全理解您的问题,但我认为您是在问如何在不先将其完全加载到内存的情况下对输入流进行操作。

在这种情况下,您不会想做这样的事情:

var ms = new MemoryStream();
incomingStream.CopyTo(ms);

确实 将整个输入流 incomingStream 加载到内存中 -- 加载到 ms.

据我所知,您的 XmlFilterStream 方法似乎是多余的,即 XmlToJsonStream 完成了 XmlFilterStream 所做的一切。

为什么不只拥有:

protected async Task XmlToJsonStream(Stream instream, Stream outStream)
{
    XmlReaderSettings readerSettings = new XmlReaderSettings();
    readerSettings.IgnoreWhitespace = false;
    var reader = XmlReader.Create(instream, readerSettings);
    var jsonWriter = new JsonTextWriter(new StreamWriter(outStream));
    jsonWriter.WriteStartObject();

    while (await reader.ReadAsync())
    {
        jsonWriter.writeReader(reader);
    }
    jsonWriter.WriteEndObject();
    jsonWriter.Flush();
}

并这样称呼它:

var incomingStream = ...
var outgoingStream = ...
XmlToJsonStream(incomingStream ,outgoingstream);

如果答案是您在 XmlFilterStream 中遗漏了一些重要细节,那么在没有看到这些细节的情况下,我建议您将它们集成到一个 XmlToJsonStream 函数中。