从 Stream.Read() 在没有数据时等待的字符串创建一个空流

Create an empty stream from a string on which Stream.Read() waits when no data

我正在尝试用我从字符串创建的流替换我从 TcpClient.GetStream() 获得的流。

我正在使用以下方法创建所述流:

public Stream GenerateStreamFromString(string s)
{
    MemoryStream stream = new MemoryStream();
    StreamWriter writer = new StreamWriter(stream);
    writer.Write(s);
    writer.Flush();
    stream.Position = 0;
    return stream;
}

然而,这个流是通过使用 Stream.Read() 在图书馆的某个地方读取的,我不想改变。问题是我用一个空字符串创建这个流,因为对象需要一个流来启动,通常在使用 TcpClient 流时它会停止在 Stream.Read() 直到它有东西要读取但不是我从我的字符串创建的流。

所以我的问题是,如何创建一个空流,稍后我可以向其中添加来自字符串的数据?

在内部使用 BlockingCollection<> 作为队列,你可以这样写:

public class WatitingStream : Stream
{
    private BlockingCollection<byte[]> Packets = new BlockingCollection<byte[]>();

    private byte[] IncompletePacket;
    private int IncompletePacketOffset;

    public WatitingStream()
    {
    }

    protected override void Dispose(bool disposing)
    {
        if (disposing)
        {
            Packets.CompleteAdding();
        }

        base.Dispose(disposing);
    }

    public override bool CanRead
    {
        get { return Packets.IsCompleted; }
    }

    public override bool CanSeek
    {
        get { return false; }
    }

    public override bool CanWrite
    {
        get { return Packets.IsAddingCompleted; }
    }

    public override void Flush()
    {
    }

    public override long Length
    {
        get
        {
            throw new NotSupportedException();
        }
    }

    public override long Position
    {
        get
        {
            throw new NotSupportedException();
        }
        set
        {
            throw new NotSupportedException();
        }
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        if (count == 0)
        {
            return 0;
        }

        byte[] packet;
        int packetOffset;

        if (IncompletePacket != null)
        {
            packet = IncompletePacket;
            packetOffset = IncompletePacketOffset;
        }
        else
        {
            if (Packets.IsCompleted)
            {
                return 0;
            }

            packet = Packets.Take();
            packetOffset = 0;
        }

        int read = Math.Min(packet.Length - packetOffset, count);
        Buffer.BlockCopy(packet, packetOffset, buffer, offset, read);

        packetOffset += read;

        if (packetOffset < packet.Length)
        {
            IncompletePacket = packet;
            IncompletePacketOffset = packetOffset;
        }
        else
        {
            IncompletePacket = null;
            IncompletePacketOffset = 0;
        }

        return read;
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        throw new NotSupportedException();
    }

    public override void SetLength(long value)
    {
        throw new NotSupportedException();
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        if (count == 0)
        {
            return;
        }

        byte[] packet = new byte[count];
        Buffer.BlockCopy(buffer, offset, packet, 0, count);
        Packets.Add(packet);
    }
}

您将其用作普通流。 Write 不会阻塞。 Read 块。

必须做出一些决定:此 Stream 基于 "packet"。它不会 Write 零长度数据包,而 Read 将 return 一个数据包的数据。 Read 不会在下一个数据包上继续。如果在 Read 之后数据包中仍有数据剩余,则该数据将保存到下一个 ReadDispose() 将停止 Write(因此如果 "client" 在 "server" 之前执行 Dispose(),如果服务器尝试执行Write)。如果 "server" 先执行 Dispose(),"client" 可以完成读取仍然存在的数据包。显然可以(容易)将此 class 分成两个 class(一个 Server 和一个 Client),其中 Server 保留 BlockingCollection<> 并且客户端有对 "server" 的引用。这将解决“Dispose()”anomaly/problem(但会使代码大小加倍:-))