使用自定义流 (IEnumerable<T>)

Consuming a custom stream (IEnumerable<T>)

我正在使用 Stream 的自定义实现,它将 IEnumerable<T> 流式传输到流中。我正在使用这个 EnumerableStream 实现来执行转换。

我正在使用它以流式传输模式通过 WCF 执行流式传输。我能够毫无问题地将 IEnumerable 转换为流。有一次,我在客户端,我可以反序列化并获取所有数据,但是我无法找到停止循环我的流的条件。我得到:

System.Runtime.Serialization.SerializationException: End of Stream encountered before parsing was completed.

这是我要实现的示例示例:

class Program
{
    public static void Main()
    {
        var ListToSend = new List<List<string>>();
        var ListToReceive = new List<List<string>>();
        ListToSend = SimulateData().ToList();
        using (Stream stream = GetStream(ListToSend))
        {
            var formatter = new BinaryFormatter();
            while (stream.CanRead || 1 == 1 || true...) // What should I put in here to stop once I read everything???
            {
                List<string> row = formatter.Deserialize(stream) as List<string>;
                ListToReceive.Add(row);
            }
            Printer(ListToReceive);
            Console.WriteLine("Done");
        }
    }

    private static void Printer(List<List<string>> data)
    {
        Console.WriteLine("Printing");
        foreach (var row in data)
        {
            foreach (var cell in row)
            {
                Console.Write(cell + "\t");
            }
            Console.WriteLine("-------------------------------------------------------------------------------");
        }
    }
    private static Stream GetStream(IEnumerable<List<string>> data)
    {
        return EnumerableStream.Create(data, DeserializerCallback);
    }

    private static List<byte> DeserializerCallback(object obj)
    {
        var binFormatter = new BinaryFormatter();
        var mStream = new MemoryStream();
        binFormatter.Serialize(mStream, obj);
        return mStream.ToArray().ToList();
    }

    private static IEnumerable<List<string>> SimulateData()
    {
        Random randomizer = new Random();
        for (var i = 0; i < 10; i++)
        {
            var row = new List<string>();
            for (var j = 0; j < 1000; j++)
            {
                row.Add((randomizer.Next(100)).ToString());
            }
            yield return row;
        }
    }
}

我没有包含自定义流。我为那些想要查看完整代码的人创建了一个 fiddle

实现长度属性:

public override long Length 
{
    get 
    {
        return (_buf.Any() || SerializeNext()) ? 1 : 0;
    } 
}

然后检查长度:

        while (stream.Length > 0)
        {
            List<string> row = formatter.Deserialize(stream) as List<string>;
            ListToReceive.Add(row);
        }

我已经在你的 fiddle 上测试过了,效果很好。

这是与@TheSoftwareJedi 的解决方案非常相似的方法,但使用了长度 属性,在这种情况下,这将 return 您 "know" 的元素长度在流中。 据我所知,这并不反对 属性.

的意图使用

首先,您可以简单地序列化 List<List<string>> 本身。 Demo here。这消除了这个专门的 class 读取流的需要。并可能使这个答案变得毫无意义。一次一个地流式传输它的唯一目的可能是一个非常大的数据集。在这种情况下将需要不同的实现,这是以下解决方案可能解决的问题。

以下答案(和您的代码)要求读取流的客户端具有 EnumerableStream class.

Do I need to add something in the custom stream itself to notify that all the data have been read?

是的。你需要实现一个新的 属性 来知道你是否有另一个 T 要读取,或者使用 Length.

public bool HasMore { get { return _buf.Any() || SerializeNext();} }

public override long Length { get { return (_buf.Any() || SerializeNext()) ? 1 : 0; } }

我觉得可以清理整个解决方案以获得 IEnumerable<T> StreamReader。但是,这有效。

Here 是调整后的工作小提琴手。请注意,我也清理了一下。与另一个 class 同名的静态 class 让我很头疼;)。另外,我会改为byte[],而不是List<byte>

Is it because the format of the deserializer and serialiser are not the same (I don't think so).

没有

I also want to know why when I put a break point in the read function, the buffer size is changing randomly.

缓冲区_buf应该是当前序列化项目的大小。这可能因项目而异。

I would prefer not to wrap the code with a try and catch, I want a clean solution that does not crash.

明智的做法是不要只吞下异常,而是了解如何使其按预期工作。

Do I need to add something in the custom stream itself to notify that all the data have been read?

可以,但是在接收到的 Stream 与 class 不同的 class.

的 WCF 场景中这无济于事

有两种标准的(官方的,设计的)方法来确定 Stream 数据的结尾:

(1) ReadByte 返回 -1

Returns

The unsigned byte cast to an Int32, or -1 if at the end of the stream.

(2) Readcount > 0

调用时返回 0

Returns

The total number of bytes read into the buffer. This can be less than the number of bytes requested if that many bytes are not currently available, or zero (0) if the end of the stream has been reached.

不幸的是,它们都消耗当前字节(前进到下一个字节)并将破坏反序列化器。

可能的解决方案是什么?

首先,实施一些 serialization/deserialization 格式(协议),让您知道是否有更多元素要反序列化。例如,List<T> 在元素之前存储 CountT[] 在元素之前存储 Length 等。由于 EnumerableStream<T> 事先不知道计数,一个简单的解决方案将在每个元素之前发出一个假字节:

private bool SerializeNext()
{
    if (!_source.MoveNext())
        return false;

    buf.Enqueue(1); // <--
    foreach (var b in _serializer(_source.Current))
        _buf.Enqueue(b);

    return true;
}

这将允许您使用

while (stream.ReadByte() != -1)
{
    // ...
}

其次,如果您想保留当前格式,更通用的解决方案是实现自定义流,它包装另一个流并实现 PeekByte 方法,其语义与标准 [=23] 相同=],但 不消耗 当前字节:

public class SequentialStream : Stream
{
    private Stream source;
    private bool leaveOpen;
    private int? nextByte;

    public SequentialStream(Stream source, bool leaveOpen = false)
    {
        if (source == null) throw new ArgumentNullException(nameof(source));
        if (!source.CanRead) throw new ArgumentException("Non readable source.", nameof(source));
        this.source = source;
        this.leaveOpen = leaveOpen;
    }

    protected override void Dispose(bool disposing)
    {
        if (disposing && !leaveOpen)
            source.Dispose();
        base.Dispose(disposing);
    }

    public override bool CanRead => true;
    public override bool CanSeek => false;
    public override bool CanWrite => false;
    public override long Length => throw new NotSupportedException();
    public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
    public override void Flush() { }
    public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
    public override void SetLength(long value) => throw new NotSupportedException();
    public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();

    public int PeekByte()
    {
        if (nextByte == null)
            nextByte = source.ReadByte();
        return nextByte.Value;
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        if (count <= 0) return 0;
        if (nextByte != null)
        {
            if (nextByte.Value < 0) return 0;
            buffer[offset] = (byte)nextByte.Value;
            if (count > 1)
            {
                int read = source.Read(buffer, offset + 1, count - 1);
                if (read == 0)
                    nextByte = -1;
                else
                    nextByte = null;
                return read + 1;
            }
            else
            {
                nextByte = null;
                return 1;
            }
        }
        else
        {
            int read = source.Read(buffer, offset, count);
            if (read == 0)
                nextByte = -1;
            return read;
        }
    }
} 

这基本上实现了具有 0 或 1 字节预读功能的只读流。

用法是这样的:

using (var stream = new SequentialStream(GetStream(ListToSend)))
{
    // ...
    while (stream.PeekByte() != -1) 
    {
        // ...
    }
    // ...
}

P.S。怎么样

I also want to know why when I put a break point in the read function, the buffer size is changing randomly.

这不是随机的。 BinaryFormatter 在内部使用 BinaryReader 来读取 Int32ByteString 等键入的值,将所需的大小传递为 count,例如4、1、字符串编码字节数(它知道是因为在实际数据之前将它们存储在流中并在尝试读取实际数据之前读取它)等。