记忆流网络模拟

Networksimulation with memorystream

我必须模拟一个带有节点和 MemoryStream 的网络,我想将对象消息从一个节点发送到另一个节点。

问题是,当我想反序列化流中的 IMessage 对象时,我总是遇到异常。另外,我如何模拟server.AcceptClient

异常:

System.Runtime.Serialization.SerializationException: "The input stream does not have a valid binary format. The start contents (in bytes) are: 05-01-00-00-00-22-50-65-65-72-5F-74-6F -5F-50-65-65 ... "

代码:

public class Network
{
    public List<Node> Nodes { get; set; }

    public Network()
    {
        Nodes = new List<Node>();
    }

    public MemoryStream GetClientStream(string ip)
    {
        foreach (var item in Nodes)
        {
            if (item.Ip == ip)
            {
                return item.Stream;
            }
        }

        return null;
    }
}


public class Node
{
    public string Ip { get; set; }

    public Client ChordClient { get; set; }

    public MemoryStream Stream { get; set; }

    public Network ChordChain { get; set; }

    public Node(string ip, string peerOne, Network network)
    {
        ChordChain = network;
        Ip = ip;
        Console.WriteLine($"Peer with Port {ip} is listening");
        Task.Run(() => openServer());
    }

    private void openServer()
    {
        try
        {
            Byte[] bytes = new Byte[256];

            Stream = new MemoryStream();
            // Enter the listening loop.
            while (true)
            {
                int i;

                while ((i = Stream.Read(bytes, 0, bytes.Length)) != 0)
                {
                    Stream.Position = 0;
                    IMessage msg = NodeTest.DeserializeFromStream(Stream);

                    msg.DoOperate();

                    Stream.Flush();
                }
            }
        }
        catch (SocketException e)
        {
            Console.WriteLine("SocketException: {0}", e);
        }
        finally
        {
            // Stop listening for new clients.
        }
    }

    public void Connect(String ip)
    {
        try
        {
            IMessage msg = new Message();

            MemoryStream stream = ChordChain.GetClientStream(ip);

            NodeTest.SerializeToStream(stream, msg);

            Console.WriteLine("Sent: {0}", "sd");
        }
        catch (ArgumentNullException e)
        {
            Console.WriteLine("ArgumentNullException: {0}", e);
        }
        catch (SocketException e)
        {
            Console.WriteLine("SocketException: {0}", e);
        }
        catch (Exception ex)
        {

        }


    }

}

public class NodeTest
{

    public static MemoryStream SerializeToStream(MemoryStream stream, object o)
    {
        IFormatter formatter = new BinaryFormatter();
        formatter.Serialize(stream, o);

        return stream;
    }

    public static IMessage DeserializeFromStream(MemoryStream stream)
    {

        stream.Position = 0;
        IFormatter formatter = new BinaryFormatter();
        stream.Seek(0, SeekOrigin.Begin);
        object o = formatter.Deserialize(stream);

        return (IMessage)(o);
    }
}

public class Message : IMessage
{

    public void DoOperate()
    {
        Console.WriteLine("Hello");
        Console.ReadKey();
    }

}

public interface IMessage
{
    void DoOperate();
}

感谢您的帮助,但现在我遇到了问题,如何才能发送对象的固定大小以使其正常工作

public class Node
{
    public string Ip { get; set; }

    public Client ChordClient { get; set; }

    public MemoryStream Stream { get; set; }

    public Network ChordChain {get; set; }

    public Node(string ip, string peerOne, Network network)
    {
        ChordChain = network;
        Ip = ip;
        Console.WriteLine($"Peer with Port {ip} is listening");
        Task.Run(() => openServer());
    }

    private void openServer()
    {
        try
        {
            NamedPipeServerStream server = new NamedPipeServerStream(Ip);
            Byte[] bytes = new Byte[256];
            int i;

            while (true)
            {
                server.WaitForConnection();

                while ((i = server.Read(bytes, 0, bytes.Length)) != 0)
                {

                    //Stream.Position = 0;
                    IMessage msg = NodeTest.DeserializeFromStream(server);

                    msg.DoOperate();
                }

            }

            //MemoryStream reader = new MemoryStream(server);
            //StreamWriter writer = new StreamWriter(server);

            //Byte[] bytes = new Byte[256];

            //Stream = new MemoryStream();
            //// Enter the listening loop.
            //while (true)
            //{
            //    int i;

            //    while ((i = Stream.Read(bytes, 0, bytes.Length)) != 0)
            //    {
            //        //Stream.Position = 0;
            //        IMessage msg = NodeTest.DeserializeFromStream(Stream);

            //        msg.DoOperate();

            //        Stream.Flush();
            //    }               
            //}
        }
        catch (SocketException e)
        {
            Console.WriteLine("SocketException: {0}", e);
        }
        finally
        {
            // Stop listening for new clients.
        }
    }

    public void Connect(String ip)
    {
        try
        {              
            IMessage msg = new Message();


            NamedPipeClientStream client = new NamedPipeClientStream(ip);
            client.Connect();

            NodeTest.SerializeToStream(client, msg);

            Console.WriteLine("Sent: {0}", "sd");

            client.Close();
        }
        catch (ArgumentNullException e)
        {
            Console.WriteLine("ArgumentNullException: {0}", e);
        }
        catch (SocketException e)
        {
            Console.WriteLine("SocketException: {0}", e);
        }
        catch(Exception ex)
        {

        }  
    }

}

public class NodeTest
{

    public static void SerializeToStream(NamedPipeClientStream stream, object o)
    { 
        IFormatter formatter = new BinaryFormatter();
        formatter.Serialize(stream, o);
    }

    public static IMessage DeserializeFromStream(NamedPipeServerStream   stream)
    {

        //stream.Position = 0;
        IFormatter formatter = new BinaryFormatter();
        //stream.Seek(0, SeekOrigin.Begin);
        object o = formatter.Deserialize(stream);

        return (IMessage)(o);
    }  
}

我不确定这有什么问题,但你肯定不知道你是否有完整的消息。

如果你像这样发送序列化的消息,那么每一方都很难知道它是否收到了整个消息。解决这个问题的方法是构建序列化消息,例如您发送:

...
[size][serialized message]
[size][serialized message]
...

所以发送给你(1)序列化到内存(2)得到多少字节的大小(3)写入大小(4)写入多少字节

并接收你 (1) 读取大小 (2) 将那么多字节读入内存 (3) 反序列化

我会先解决这个问题。

您对 MemoryStream 的使用不是线程安全的。您不能在一个线程上重新定位和读取流,而另一个线程正在写入流。要对这种类型的异步通信建模,请使用管道流:

 NamedPipeServerStream server = new NamedPipeServerStream("MyPipe");
 server.WaitForConnection();
 StreamReader reader = new StreamReader(server);
 StreamWriter writer = new StreamWriter(server);

 [...]

 NamedPipeClientStream client = new NamedPipeClientStream("MyPipe");
 client.Connect();
 StreamReader reader = new StreamReader(client);
 StreamWriter writer = new StreamWriter(client);