protobuf-net:序列化 System.IO.Stream 类型的 属性 而无需将整个流加载到内存中

protobuf-net: Serialize a property of type System.IO.Stream without loading the entire stream into memory

protobuf-net 无法序列化以下内容 class 因为不支持序列化 Stream 类型的对象:

[ProtoContract]
class StreamObject
{
    [ProtoMember(1)]
    public Stream StreamProperty { get; set; }
}

我知道我可以通过使用 byte[] 类型的序列化 属性 并将流读入 属性 来解决这个问题,如 this question 中所示。但这需要将整个 byte[] 加载到内存中,如果流很长,会很快耗尽系统资源。

有没有办法在 protobuf-net 中将流序列化为字节数组,而无需将整个字节序列加载到内存中?

这里的基本困难不是protobuf-net,而是V2 protocol buffer format。可以通过两种方式对重复元素(例如字节数组或流)进行编码:

  • 作为打包的重复元素。这里字段的所有元素都被打包到一个键值对中,线类型为 2(长度分隔)。每个元素的编码方式与正常情况下相同,只是前面没有标签。

    protobuf-net 自动以这种格式编码字节数组,但是这样做需要提前知道字节总数。对于字节流,这可能需要将整个流加载到内存中(例如,当 StreamProperty.CanSeek == false 时),这违反了您的要求。

  • 作为 重复元素。这里编码的消息有零个或多个具有相同标签号的键值对。

    对于字节流,使用这种格式会导致编码消息大量膨胀,因为每个字节都需要一个额外的整数密钥。

如您所见,两种默认表示均不能满足您的需要。相反,将大字节流编码为 "fairly large" 块序列是有意义的,其中每个块都被打包,但整个序列不是。

StreamObject 的以下版本执行此操作:

[ProtoContract]
class StreamObject
{
    public StreamObject() : this(new MemoryStream()) { }

    public StreamObject(Stream stream)
    {
        if (stream == null)
            throw new ArgumentNullException();
        this.StreamProperty = stream;
    }

    [ProtoIgnore]
    public Stream StreamProperty { get; set; }

    internal static event EventHandler OnDataReadBegin;

    internal static event EventHandler OnDataReadEnd;

    const int ChunkSize = 4096;

    [ProtoMember(1, IsPacked = false, OverwriteList = true)]
    IEnumerable<ByteBuffer> Data
    {
        get
        {
            if (OnDataReadBegin != null)
                OnDataReadBegin(this, new EventArgs());

            while (true)
            {
                byte[] buffer = new byte[ChunkSize];
                int read = StreamProperty.Read(buffer, 0, buffer.Length);
                if (read <= 0)
                {
                    break;
                }
                else if (read == buffer.Length)
                {
                    yield return new ByteBuffer { Data = buffer };
                }
                else
                {
                    Array.Resize(ref buffer, read);
                    yield return new ByteBuffer { Data = buffer };
                    break;
                }
            }

            if (OnDataReadEnd != null)
                OnDataReadEnd(this, new EventArgs());
        }
        set
        {
            if (value == null)
                return;
            foreach (var buffer in value)
                StreamProperty.Write(buffer.Data, 0, buffer.Data.Length);
        }
    }
}

[ProtoContract]
struct ByteBuffer
{
    [ProtoMember(1, IsPacked = true)]
    public byte[] Data { get; set; }
}

注意到 OnDataReadBeginOnDataReadEnd 事件了吗?然后为了调试目的我添加了,以启用检查输入流是否实际上正在流式传输到输出 protobuf 流中。以下测试 class 执行此操作:

internal class TestClass
{
    public void Test()
    {
        var writeStream = new MemoryStream();

        long beginLength = 0;
        long endLength = 0;

        EventHandler begin = (o, e) => { beginLength = writeStream.Length; Console.WriteLine(string.Format("Begin serialization of Data, writeStream.Length = {0}", writeStream.Length)); };
        EventHandler end = (o, e) => { endLength = writeStream.Length;  Console.WriteLine(string.Format("End serialization of Data, writeStream.Length = {0}", writeStream.Length)); };

        StreamObject.OnDataReadBegin += begin;
        StreamObject.OnDataReadEnd += end;

        try
        {
            int length = 1000000;

            var inputStream = new MemoryStream();
            for (int i = 0; i < length; i++)
            {
                inputStream.WriteByte(unchecked((byte)i));
            }
            inputStream.Position = 0;

            var streamObject = new StreamObject(inputStream);

            Serializer.Serialize(writeStream, streamObject);
            var data = writeStream.ToArray();

            StreamObject newStreamObject;
            using (var s = new MemoryStream(data))
            {
                newStreamObject = Serializer.Deserialize<StreamObject>(s);
            }

            if (beginLength >= endLength)
            {
                throw new InvalidOperationException("inputStream was completely buffered before writing to writeStream");
            }

            inputStream.Position = 0;
            newStreamObject.StreamProperty.Position = 0;

            if (!inputStream.AsEnumerable().SequenceEqual(newStreamObject.StreamProperty.AsEnumerable()))
            {
                throw new InvalidOperationException("!inputStream.AsEnumerable().SequenceEqual(newStreamObject.StreamProperty.AsEnumerable())");
            }
            else
            {
                Console.WriteLine("Streams identical.");
            }
        }
        finally
        {
            StreamObject.OnDataReadBegin -= begin;
            StreamObject.OnDataReadEnd -= end;
        }
    }
}

public static class StreamExtensions
{
    public static IEnumerable<byte> AsEnumerable(this Stream stream)
    {
        if (stream == null)
            throw new ArgumentNullException();
        int b;
        while ((b = stream.ReadByte()) != -1)
            yield return checked((byte)b);
    }
}

上面的输出是:

Begin serialization of Data, writeStream.Length = 0
End serialization of Data, writeStream.Length = 1000888
Streams identical.

这表明输入流确实流式传输到输出,而没有立即完全加载到内存中。

原型fiddle.

是否有一种机制可以使用流中的字节递增地写出打包的重复元素,提前知道长度?

看来不是。假设您有一个 CanSeek == true 的流,您可以将其封装在一个 IList<byte> 中,该 IList<byte> 枚举流中的字节,提供对流中字节的随机访问,并且 returns 流IList.Count 中的长度。有一个示例 fiddle here showing such an attempt. Unfortunately, however, ListDecorator.Write() simply enumerates the list and buffers its encoded contents before writing them to the output stream, which causes the input stream to be loaded completely into memory. I think this happens because protobuf-net encodes a List<byte> differently from a byte [], namely as a length-delimited sequence of Base 128 Varints. Since the Varint representation of a byte sometimes requires more than one byte, the length cannot be computed in advance from the list count. See 可提供有关字节数组和列表的编码方式差异的更多详细信息。应该可以用与 byte [] 相同的方式实现 IList<byte> 的编码——只是目前不可用。