记忆流网络模拟
Networksimulation with memorystream
我必须模拟一个带有节点和 MemoryStream
的网络,我想将对象消息从一个节点发送到另一个节点。
问题是,当我想反序列化流中的 IMessage
对象时,我总是遇到异常。另外,我如何模拟server.AcceptClient
?
异常:
System.Runtime.Serialization.SerializationException: "The input stream does not have a valid binary format. The start contents (in bytes) are: 05-01-00-00-00-22-50-65-65-72-5F-74-6F -5F-50-65-65 ... "
代码:
public class Network
{
public List<Node> Nodes { get; set; }
public Network()
{
Nodes = new List<Node>();
}
public MemoryStream GetClientStream(string ip)
{
foreach (var item in Nodes)
{
if (item.Ip == ip)
{
return item.Stream;
}
}
return null;
}
}
public class Node
{
public string Ip { get; set; }
public Client ChordClient { get; set; }
public MemoryStream Stream { get; set; }
public Network ChordChain { get; set; }
public Node(string ip, string peerOne, Network network)
{
ChordChain = network;
Ip = ip;
Console.WriteLine($"Peer with Port {ip} is listening");
Task.Run(() => openServer());
}
private void openServer()
{
try
{
Byte[] bytes = new Byte[256];
Stream = new MemoryStream();
// Enter the listening loop.
while (true)
{
int i;
while ((i = Stream.Read(bytes, 0, bytes.Length)) != 0)
{
Stream.Position = 0;
IMessage msg = NodeTest.DeserializeFromStream(Stream);
msg.DoOperate();
Stream.Flush();
}
}
}
catch (SocketException e)
{
Console.WriteLine("SocketException: {0}", e);
}
finally
{
// Stop listening for new clients.
}
}
public void Connect(String ip)
{
try
{
IMessage msg = new Message();
MemoryStream stream = ChordChain.GetClientStream(ip);
NodeTest.SerializeToStream(stream, msg);
Console.WriteLine("Sent: {0}", "sd");
}
catch (ArgumentNullException e)
{
Console.WriteLine("ArgumentNullException: {0}", e);
}
catch (SocketException e)
{
Console.WriteLine("SocketException: {0}", e);
}
catch (Exception ex)
{
}
}
}
public class NodeTest
{
public static MemoryStream SerializeToStream(MemoryStream stream, object o)
{
IFormatter formatter = new BinaryFormatter();
formatter.Serialize(stream, o);
return stream;
}
public static IMessage DeserializeFromStream(MemoryStream stream)
{
stream.Position = 0;
IFormatter formatter = new BinaryFormatter();
stream.Seek(0, SeekOrigin.Begin);
object o = formatter.Deserialize(stream);
return (IMessage)(o);
}
}
public class Message : IMessage
{
public void DoOperate()
{
Console.WriteLine("Hello");
Console.ReadKey();
}
}
public interface IMessage
{
void DoOperate();
}
感谢您的帮助,但现在我遇到了问题,如何才能发送对象的固定大小以使其正常工作
public class Node
{
public string Ip { get; set; }
public Client ChordClient { get; set; }
public MemoryStream Stream { get; set; }
public Network ChordChain {get; set; }
public Node(string ip, string peerOne, Network network)
{
ChordChain = network;
Ip = ip;
Console.WriteLine($"Peer with Port {ip} is listening");
Task.Run(() => openServer());
}
private void openServer()
{
try
{
NamedPipeServerStream server = new NamedPipeServerStream(Ip);
Byte[] bytes = new Byte[256];
int i;
while (true)
{
server.WaitForConnection();
while ((i = server.Read(bytes, 0, bytes.Length)) != 0)
{
//Stream.Position = 0;
IMessage msg = NodeTest.DeserializeFromStream(server);
msg.DoOperate();
}
}
//MemoryStream reader = new MemoryStream(server);
//StreamWriter writer = new StreamWriter(server);
//Byte[] bytes = new Byte[256];
//Stream = new MemoryStream();
//// Enter the listening loop.
//while (true)
//{
// int i;
// while ((i = Stream.Read(bytes, 0, bytes.Length)) != 0)
// {
// //Stream.Position = 0;
// IMessage msg = NodeTest.DeserializeFromStream(Stream);
// msg.DoOperate();
// Stream.Flush();
// }
//}
}
catch (SocketException e)
{
Console.WriteLine("SocketException: {0}", e);
}
finally
{
// Stop listening for new clients.
}
}
public void Connect(String ip)
{
try
{
IMessage msg = new Message();
NamedPipeClientStream client = new NamedPipeClientStream(ip);
client.Connect();
NodeTest.SerializeToStream(client, msg);
Console.WriteLine("Sent: {0}", "sd");
client.Close();
}
catch (ArgumentNullException e)
{
Console.WriteLine("ArgumentNullException: {0}", e);
}
catch (SocketException e)
{
Console.WriteLine("SocketException: {0}", e);
}
catch(Exception ex)
{
}
}
}
public class NodeTest
{
public static void SerializeToStream(NamedPipeClientStream stream, object o)
{
IFormatter formatter = new BinaryFormatter();
formatter.Serialize(stream, o);
}
public static IMessage DeserializeFromStream(NamedPipeServerStream stream)
{
//stream.Position = 0;
IFormatter formatter = new BinaryFormatter();
//stream.Seek(0, SeekOrigin.Begin);
object o = formatter.Deserialize(stream);
return (IMessage)(o);
}
}
我不确定这有什么问题,但你肯定不知道你是否有完整的消息。
如果你像这样发送序列化的消息,那么每一方都很难知道它是否收到了整个消息。解决这个问题的方法是构建序列化消息,例如您发送:
...
[size][serialized message]
[size][serialized message]
...
所以发送给你(1)序列化到内存(2)得到多少字节的大小(3)写入大小(4)写入多少字节
并接收你 (1) 读取大小 (2) 将那么多字节读入内存 (3) 反序列化
我会先解决这个问题。
您对 MemoryStream 的使用不是线程安全的。您不能在一个线程上重新定位和读取流,而另一个线程正在写入流。要对这种类型的异步通信建模,请使用管道流:
NamedPipeServerStream server = new NamedPipeServerStream("MyPipe");
server.WaitForConnection();
StreamReader reader = new StreamReader(server);
StreamWriter writer = new StreamWriter(server);
[...]
NamedPipeClientStream client = new NamedPipeClientStream("MyPipe");
client.Connect();
StreamReader reader = new StreamReader(client);
StreamWriter writer = new StreamWriter(client);
我必须模拟一个带有节点和 MemoryStream
的网络,我想将对象消息从一个节点发送到另一个节点。
问题是,当我想反序列化流中的 IMessage
对象时,我总是遇到异常。另外,我如何模拟server.AcceptClient
?
异常:
System.Runtime.Serialization.SerializationException: "The input stream does not have a valid binary format. The start contents (in bytes) are: 05-01-00-00-00-22-50-65-65-72-5F-74-6F -5F-50-65-65 ... "
代码:
public class Network
{
public List<Node> Nodes { get; set; }
public Network()
{
Nodes = new List<Node>();
}
public MemoryStream GetClientStream(string ip)
{
foreach (var item in Nodes)
{
if (item.Ip == ip)
{
return item.Stream;
}
}
return null;
}
}
public class Node
{
public string Ip { get; set; }
public Client ChordClient { get; set; }
public MemoryStream Stream { get; set; }
public Network ChordChain { get; set; }
public Node(string ip, string peerOne, Network network)
{
ChordChain = network;
Ip = ip;
Console.WriteLine($"Peer with Port {ip} is listening");
Task.Run(() => openServer());
}
private void openServer()
{
try
{
Byte[] bytes = new Byte[256];
Stream = new MemoryStream();
// Enter the listening loop.
while (true)
{
int i;
while ((i = Stream.Read(bytes, 0, bytes.Length)) != 0)
{
Stream.Position = 0;
IMessage msg = NodeTest.DeserializeFromStream(Stream);
msg.DoOperate();
Stream.Flush();
}
}
}
catch (SocketException e)
{
Console.WriteLine("SocketException: {0}", e);
}
finally
{
// Stop listening for new clients.
}
}
public void Connect(String ip)
{
try
{
IMessage msg = new Message();
MemoryStream stream = ChordChain.GetClientStream(ip);
NodeTest.SerializeToStream(stream, msg);
Console.WriteLine("Sent: {0}", "sd");
}
catch (ArgumentNullException e)
{
Console.WriteLine("ArgumentNullException: {0}", e);
}
catch (SocketException e)
{
Console.WriteLine("SocketException: {0}", e);
}
catch (Exception ex)
{
}
}
}
public class NodeTest
{
public static MemoryStream SerializeToStream(MemoryStream stream, object o)
{
IFormatter formatter = new BinaryFormatter();
formatter.Serialize(stream, o);
return stream;
}
public static IMessage DeserializeFromStream(MemoryStream stream)
{
stream.Position = 0;
IFormatter formatter = new BinaryFormatter();
stream.Seek(0, SeekOrigin.Begin);
object o = formatter.Deserialize(stream);
return (IMessage)(o);
}
}
public class Message : IMessage
{
public void DoOperate()
{
Console.WriteLine("Hello");
Console.ReadKey();
}
}
public interface IMessage
{
void DoOperate();
}
感谢您的帮助,但现在我遇到了问题,如何才能发送对象的固定大小以使其正常工作
public class Node
{
public string Ip { get; set; }
public Client ChordClient { get; set; }
public MemoryStream Stream { get; set; }
public Network ChordChain {get; set; }
public Node(string ip, string peerOne, Network network)
{
ChordChain = network;
Ip = ip;
Console.WriteLine($"Peer with Port {ip} is listening");
Task.Run(() => openServer());
}
private void openServer()
{
try
{
NamedPipeServerStream server = new NamedPipeServerStream(Ip);
Byte[] bytes = new Byte[256];
int i;
while (true)
{
server.WaitForConnection();
while ((i = server.Read(bytes, 0, bytes.Length)) != 0)
{
//Stream.Position = 0;
IMessage msg = NodeTest.DeserializeFromStream(server);
msg.DoOperate();
}
}
//MemoryStream reader = new MemoryStream(server);
//StreamWriter writer = new StreamWriter(server);
//Byte[] bytes = new Byte[256];
//Stream = new MemoryStream();
//// Enter the listening loop.
//while (true)
//{
// int i;
// while ((i = Stream.Read(bytes, 0, bytes.Length)) != 0)
// {
// //Stream.Position = 0;
// IMessage msg = NodeTest.DeserializeFromStream(Stream);
// msg.DoOperate();
// Stream.Flush();
// }
//}
}
catch (SocketException e)
{
Console.WriteLine("SocketException: {0}", e);
}
finally
{
// Stop listening for new clients.
}
}
public void Connect(String ip)
{
try
{
IMessage msg = new Message();
NamedPipeClientStream client = new NamedPipeClientStream(ip);
client.Connect();
NodeTest.SerializeToStream(client, msg);
Console.WriteLine("Sent: {0}", "sd");
client.Close();
}
catch (ArgumentNullException e)
{
Console.WriteLine("ArgumentNullException: {0}", e);
}
catch (SocketException e)
{
Console.WriteLine("SocketException: {0}", e);
}
catch(Exception ex)
{
}
}
}
public class NodeTest
{
public static void SerializeToStream(NamedPipeClientStream stream, object o)
{
IFormatter formatter = new BinaryFormatter();
formatter.Serialize(stream, o);
}
public static IMessage DeserializeFromStream(NamedPipeServerStream stream)
{
//stream.Position = 0;
IFormatter formatter = new BinaryFormatter();
//stream.Seek(0, SeekOrigin.Begin);
object o = formatter.Deserialize(stream);
return (IMessage)(o);
}
}
我不确定这有什么问题,但你肯定不知道你是否有完整的消息。
如果你像这样发送序列化的消息,那么每一方都很难知道它是否收到了整个消息。解决这个问题的方法是构建序列化消息,例如您发送:
...
[size][serialized message]
[size][serialized message]
...
所以发送给你(1)序列化到内存(2)得到多少字节的大小(3)写入大小(4)写入多少字节
并接收你 (1) 读取大小 (2) 将那么多字节读入内存 (3) 反序列化
我会先解决这个问题。
您对 MemoryStream 的使用不是线程安全的。您不能在一个线程上重新定位和读取流,而另一个线程正在写入流。要对这种类型的异步通信建模,请使用管道流:
NamedPipeServerStream server = new NamedPipeServerStream("MyPipe");
server.WaitForConnection();
StreamReader reader = new StreamReader(server);
StreamWriter writer = new StreamWriter(server);
[...]
NamedPipeClientStream client = new NamedPipeClientStream("MyPipe");
client.Connect();
StreamReader reader = new StreamReader(client);
StreamWriter writer = new StreamWriter(client);