从 Stream.Read() 在没有数据时等待的字符串创建一个空流
Create an empty stream from a string on which Stream.Read() waits when no data
我正在尝试用我从字符串创建的流替换我从 TcpClient.GetStream()
获得的流。
我正在使用以下方法创建所述流:
public Stream GenerateStreamFromString(string s)
{
MemoryStream stream = new MemoryStream();
StreamWriter writer = new StreamWriter(stream);
writer.Write(s);
writer.Flush();
stream.Position = 0;
return stream;
}
然而,这个流是通过使用 Stream.Read()
在图书馆的某个地方读取的,我不想改变。问题是我用一个空字符串创建这个流,因为对象需要一个流来启动,通常在使用 TcpClient
流时它会停止在 Stream.Read()
直到它有东西要读取但不是我从我的字符串创建的流。
所以我的问题是,如何创建一个空流,稍后我可以向其中添加来自字符串的数据?
在内部使用 BlockingCollection<>
作为队列,你可以这样写:
public class WatitingStream : Stream
{
private BlockingCollection<byte[]> Packets = new BlockingCollection<byte[]>();
private byte[] IncompletePacket;
private int IncompletePacketOffset;
public WatitingStream()
{
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
Packets.CompleteAdding();
}
base.Dispose(disposing);
}
public override bool CanRead
{
get { return Packets.IsCompleted; }
}
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return Packets.IsAddingCompleted; }
}
public override void Flush()
{
}
public override long Length
{
get
{
throw new NotSupportedException();
}
}
public override long Position
{
get
{
throw new NotSupportedException();
}
set
{
throw new NotSupportedException();
}
}
public override int Read(byte[] buffer, int offset, int count)
{
if (count == 0)
{
return 0;
}
byte[] packet;
int packetOffset;
if (IncompletePacket != null)
{
packet = IncompletePacket;
packetOffset = IncompletePacketOffset;
}
else
{
if (Packets.IsCompleted)
{
return 0;
}
packet = Packets.Take();
packetOffset = 0;
}
int read = Math.Min(packet.Length - packetOffset, count);
Buffer.BlockCopy(packet, packetOffset, buffer, offset, read);
packetOffset += read;
if (packetOffset < packet.Length)
{
IncompletePacket = packet;
IncompletePacketOffset = packetOffset;
}
else
{
IncompletePacket = null;
IncompletePacketOffset = 0;
}
return read;
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
if (count == 0)
{
return;
}
byte[] packet = new byte[count];
Buffer.BlockCopy(buffer, offset, packet, 0, count);
Packets.Add(packet);
}
}
您将其用作普通流。 Write
不会阻塞。 Read
块。
必须做出一些决定:此 Stream
基于 "packet"。它不会 Write
零长度数据包,而 Read
将 return 一个数据包的数据。 Read
不会在下一个数据包上继续。如果在 Read
之后数据包中仍有数据剩余,则该数据将保存到下一个 Read
。 Dispose()
将停止 Write
(因此如果 "client" 在 "server" 之前执行 Dispose()
,如果服务器尝试执行Write
)。如果 "server" 先执行 Dispose()
,"client" 可以完成读取仍然存在的数据包。显然可以(容易)将此 class 分成两个 class(一个 Server
和一个 Client
),其中 Server
保留 BlockingCollection<>
并且客户端有对 "server" 的引用。这将解决“Dispose()
”anomaly/problem(但会使代码大小加倍:-))
我正在尝试用我从字符串创建的流替换我从 TcpClient.GetStream()
获得的流。
我正在使用以下方法创建所述流:
public Stream GenerateStreamFromString(string s)
{
MemoryStream stream = new MemoryStream();
StreamWriter writer = new StreamWriter(stream);
writer.Write(s);
writer.Flush();
stream.Position = 0;
return stream;
}
然而,这个流是通过使用 Stream.Read()
在图书馆的某个地方读取的,我不想改变。问题是我用一个空字符串创建这个流,因为对象需要一个流来启动,通常在使用 TcpClient
流时它会停止在 Stream.Read()
直到它有东西要读取但不是我从我的字符串创建的流。
所以我的问题是,如何创建一个空流,稍后我可以向其中添加来自字符串的数据?
在内部使用 BlockingCollection<>
作为队列,你可以这样写:
public class WatitingStream : Stream
{
private BlockingCollection<byte[]> Packets = new BlockingCollection<byte[]>();
private byte[] IncompletePacket;
private int IncompletePacketOffset;
public WatitingStream()
{
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
Packets.CompleteAdding();
}
base.Dispose(disposing);
}
public override bool CanRead
{
get { return Packets.IsCompleted; }
}
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return Packets.IsAddingCompleted; }
}
public override void Flush()
{
}
public override long Length
{
get
{
throw new NotSupportedException();
}
}
public override long Position
{
get
{
throw new NotSupportedException();
}
set
{
throw new NotSupportedException();
}
}
public override int Read(byte[] buffer, int offset, int count)
{
if (count == 0)
{
return 0;
}
byte[] packet;
int packetOffset;
if (IncompletePacket != null)
{
packet = IncompletePacket;
packetOffset = IncompletePacketOffset;
}
else
{
if (Packets.IsCompleted)
{
return 0;
}
packet = Packets.Take();
packetOffset = 0;
}
int read = Math.Min(packet.Length - packetOffset, count);
Buffer.BlockCopy(packet, packetOffset, buffer, offset, read);
packetOffset += read;
if (packetOffset < packet.Length)
{
IncompletePacket = packet;
IncompletePacketOffset = packetOffset;
}
else
{
IncompletePacket = null;
IncompletePacketOffset = 0;
}
return read;
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
if (count == 0)
{
return;
}
byte[] packet = new byte[count];
Buffer.BlockCopy(buffer, offset, packet, 0, count);
Packets.Add(packet);
}
}
您将其用作普通流。 Write
不会阻塞。 Read
块。
必须做出一些决定:此 Stream
基于 "packet"。它不会 Write
零长度数据包,而 Read
将 return 一个数据包的数据。 Read
不会在下一个数据包上继续。如果在 Read
之后数据包中仍有数据剩余,则该数据将保存到下一个 Read
。 Dispose()
将停止 Write
(因此如果 "client" 在 "server" 之前执行 Dispose()
,如果服务器尝试执行Write
)。如果 "server" 先执行 Dispose()
,"client" 可以完成读取仍然存在的数据包。显然可以(容易)将此 class 分成两个 class(一个 Server
和一个 Client
),其中 Server
保留 BlockingCollection<>
并且客户端有对 "server" 的引用。这将解决“Dispose()
”anomaly/problem(但会使代码大小加倍:-))