当多个客户端尝试连接时,多线程服务器给出 "Cannot access a disposed object" 错误

Multithreaded server gives "Cannot access a disposed object" error when multiple clients try to connect

我有一个名为 server.cs 的多线程服务器和一个 client.cs 这个程序的 objective 如下:

每个客户端程序都会创建一个到服务器的连接。 服务器将回复一条欢迎消息(一个字符串)。 客户端将向服务器发送一个 json 格式的字符串。 服务器将以相同的格式回复,但添加了关于秘密和 结束状态。然后,客户端将向服务器发送一条消息以停止 通信并将关闭连接。 当所有的客户都沟通过后,最后会有一个客户与 id=-1 ,通知服务器停止。当服务器收到消息时 从结束客户端(id = -1),它必须打印所有收集的信息和 沟通的客户数量。

最大的部分已经完成。当我在我的 client.cs 和 运行 中使用 SequentialSimulation() 方法时,client.cs 程序的许多实例服务器工作正常,并按照上面的描述进行操作。但是当我在 client.cs 中使用 ConcurrentSimulation() 方法并且只有 运行 一个 clients.cs 实例时,它崩溃并给我以下错误:

Unhandled exception. System.AggregateException: One or more errors occurred. (Cannot access a disposed object.
Object name: 'System.Net.Sockets.Socket'.)
 ---> System.ObjectDisposedException: Cannot access a disposed object.
Object name: 'System.Net.Sockets.Socket'.
   at System.Net.Sockets.Socket.get_RemoteEndPoint()
   at SocketClient.Client.endCommunication() in /Users/test/Documents/client/Program.cs:line 143
   at SocketClient.ClientsSimulator.SequentialSimulation() in /Users/test/Documents/client/Program.cs:line 174
   at SocketClient.ClientsSimulator.<ConcurrentSimulation>b__5_0() in /Users/test/Documents/client/Program.cs:line 191
   at System.Threading.Tasks.Task.InnerInvoke()
   at System.Threading.Tasks.Task.<>c.<.cctor>b__274_0(Object obj)
   at System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, Object state)
--- End of stack trace from previous location where exception was thrown ---
   at System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, Object state)
   at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot, Thread threadPoolThread)
   --- End of inner exception stack trace ---
   at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)
   at System.Threading.Tasks.Task.Wait()
   at SocketClient.ClientsSimulator.ConcurrentSimulation() in /Users/test/Documents/client/Program.cs:line 197
   at SocketClient.Program.Main(String[] args) in /Users/test/Documents/client/Program.cs:line 219

ConcurrentSimulation() 的目标是让多个客户端连接到服务器,而不是 运行ning 个 client.cs 程序的实例(服务器应该能够处理 200 个客户端一次,运行宁 client.cs 的 200 个实例是很多工作)。

帮我解决这个问题下面你会找到代码。

Server.cs:

using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Text.Json;
using System.Threading;

namespace SocketServer
{
    public class ClientInfo
    {
        public string studentnr { get; set; }
        public string classname { get; set; }
        public int clientid { get; set; }
        public string teamname { get; set; }
        public string ip { get; set; }
        public string secret { get; set; }
        public string status { get; set; }
    }

    public class Message
    {
        public const string welcome = "WELCOME";
        public const string stopCommunication = "COMC-STOP";
        public const string statusEnd = "STAT-STOP";
        public const string secret = "SECRET";
    }

    public class SequentialServer
    {
        public Socket listener;
        public IPEndPoint localEndPoint;
        //Definig the ip address
        public IPAddress ipAddress = IPAddress.Parse("127.0.0.1");
        // Defining the portnumber
        public readonly int portNumber = 11111;

        public String results = "";
        public LinkedList<ClientInfo> clients = new LinkedList<ClientInfo>();

        private Boolean stopCond = false;
        private int processingTime = 1000;
        private int listeningQueueSize = 5;

        public void prepareServer()
        {
            byte[] bytes = new Byte[1024];
            String data = null;
            int numByte = 0;
            string replyMsg = "";
            bool stop;

            try
            {
                Console.WriteLine("[Server] is ready to start ...");
                // Establish the local endpoint
                localEndPoint = new IPEndPoint(ipAddress, portNumber);
                listener = new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
                Console.Out.WriteLine("[Server] A socket is established ...");
                // associate a network address to the Server Socket. All clients must know this address
                listener.Bind(localEndPoint);
                // This is a non-blocking listen with max number of pending requests
                listener.Listen(listeningQueueSize);
                while (true)
                {
                    Console.WriteLine("Waiting connection ... ");
                    // Suspend while waiting for incoming connection 
                    Socket connection = listener.Accept();
                    this.sendReply(connection, Message.welcome);

                    stop = false;
                    while (!stop)
                    {
                        numByte = connection.Receive(bytes);
                        data = Encoding.ASCII.GetString(bytes, 0, numByte);
                        replyMsg = processMessage(data);
                        if (replyMsg.Equals(Message.stopCommunication))
                        {
                            stop = true;
                            break;
                        }
                        else
                            this.sendReply(connection, replyMsg);
                    }

                }

            }
            catch (Exception e)
            {
                Console.Out.WriteLine(e.Message);
            }
        }
        public void handleClient(Socket con)
        {
        }
        public string processMessage(String msg)
        {
            Thread.Sleep(processingTime);
            Console.WriteLine("[Server] received from the client -> {0} ", msg);
            string replyMsg = "";

            try
            {
                switch (msg)
                {
                    case Message.stopCommunication:
                        replyMsg = Message.stopCommunication;
                        break;
                    default:
                        ClientInfo c = JsonSerializer.Deserialize<ClientInfo>(msg.ToString());
                        clients.AddLast(c);
                        if (c.clientid == -1)
                        {
                            stopCond = true;
                            exportResults();
                        }
                        c.secret = c.studentnr + Message.secret;
                        c.status = Message.statusEnd;
                        replyMsg = JsonSerializer.Serialize<ClientInfo>(c);
                        break;
                }
            }
            catch (Exception e)
            {
                Console.Out.WriteLine("[Server] processMessage {0}", e.Message);
            }

            return replyMsg;
        }
        public void sendReply(Socket connection, string msg)
        {
            byte[] encodedMsg = Encoding.ASCII.GetBytes(msg);
            connection.Send(encodedMsg);
        }
        public void exportResults()
        {
            if (stopCond)
            {
                this.printClients();
            }
        }
        public void printClients()
        {
            string delimiter = " , ";
            Console.Out.WriteLine("[Server] This is the list of clients communicated");
            foreach (ClientInfo c in clients)
            {
                Console.WriteLine(c.classname + delimiter + c.studentnr + delimiter + c.clientid.ToString());
            }
            Console.Out.WriteLine("[Server] Number of handled clients: {0}", clients.Count);

            clients.Clear();
            stopCond = false;

        }
    }



    public class ConcurrentServer
    {
        public  static Socket listener;
        public static IPEndPoint localEndPoint;
        public static List<HostInfo> hosts;
        public static IPAddress ipAddress = IPAddress.Parse("127.0.0.1");
        public static readonly int portNumber = 11111;
        public static LinkedList<ClientInfo> clients = new LinkedList<ClientInfo>();

        private static Boolean stopCond = false;
        private static int processingTime = 1000;
        private static int listeningQueueSize = 5;


        public void prepareServer()
        {


            try
            {
                Console.WriteLine("[ConcurrentServer] is ready to start ...");


                listener = new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
                localEndPoint = new IPEndPoint(ipAddress, portNumber);
                listener.Bind(localEndPoint);
                // A list of clients// to do: change the name if the list
                hosts = new List<HostInfo>();
                Thread listenThread = new Thread(ListenThread);

                listenThread.Start();

                Console.WriteLine("Waiting connection ... ");

            }
            catch (Exception e)
            {
            }


        }

        public static  void ClientInfoThread(object clientSocket)
        {
            // A thread that handles the incoming and outgoing messages.
            Socket cSocket = (Socket) clientSocket;

            byte[] bytes = new Byte[1024];
            String data = null;
            int numByte = 0;
            string replyMsg = "";
            bool stop;

            try

            {
            while (true)
            {
                // Suspend while waiting for incoming connection 
                //Socket connection = listener.Accept();
                Console.WriteLine("Waiting connection ... ");
                sendReply(cSocket, Message.welcome);

                stop = false;
                while (!stop)
                {
                    numByte = cSocket.Receive(bytes);
                    data = Encoding.ASCII.GetString(bytes, 0, numByte);
                    replyMsg = processMessage(data);
                    if (replyMsg.Equals(Message.stopCommunication))
                    {
                        stop = true;
                        break;
                    }
                    else
                        sendReply(cSocket, replyMsg);
                }

            }


            }catch(Exception e){
                //catches exception
            }

        }
        // a thread that adds each client socket  that is trying to connect, in the list.
        public static void ListenThread()
        {
            for( ; ; )
            {
            listener.Listen(0);
            hosts.Add(new HostInfo(listener.Accept()));  
            }
        }
        public  static string processMessage(String msg)
        {
            Thread.Sleep(processingTime);
            Console.WriteLine("[ConcurrentServer] received from the client -> {0} ", msg);
            string replyMsg = "";

            try
            {
                switch (msg)
                {
                    case Message.stopCommunication:
                        replyMsg = Message.stopCommunication;
                        break;
                    default:
                        ClientInfo c = JsonSerializer.Deserialize<ClientInfo>(msg.ToString());
                        clients.AddLast(c);
                        if (c.clientid == -1)
                        {
                            stopCond = true;
                            exportResults();
                        }
                        c.secret = c.studentnr + Message.secret;
                        c.status = Message.statusEnd;
                        replyMsg = JsonSerializer.Serialize<ClientInfo>(c);
                        break;
                }
            }
            catch (Exception e)
            {
            }

            return replyMsg;
        }
        public static void sendReply(Socket connection, string msg)
        {
            byte[] encodedMsg = Encoding.ASCII.GetBytes(msg);
            connection.Send(encodedMsg);
        }
        public static void exportResults()
        {
            if (stopCond)
            {
                printClients();
            }
        }
        public static void printClients()
        {
            string delimiter = " , ";
            Console.Out.WriteLine("[ConcurrentServer] This is the list of clients communicated");
            foreach (ClientInfo c in clients)
            {
                Console.WriteLine(c.classname + delimiter + c.studentnr + delimiter + c.clientid.ToString());
            }
            Console.Out.WriteLine("[ConcurrentServer] Number of handled clients: {0}", clients.Count);

            clients.Clear();
            stopCond = false;

        }





    }

    public class HostInfo{

        public Socket hostSocket;
        public Thread hostThread;
        public string id;
        public HostInfo()
        {
            id = Guid.NewGuid().ToString();
            hostThread= new Thread(ConcurrentServer.ClientInfoThread);
            hostThread.Start(hostSocket);

        }
         public HostInfo(Socket hostSocket)
        {
            this.hostSocket = hostSocket;
            id = Guid.NewGuid().ToString();
            hostThread= new Thread(ConcurrentServer.ClientInfoThread);
            hostThread.Start(hostSocket);

        }

    }



    public class ServerSimulator
    {
        public static void sequentialRun()
        {
            Console.Out.WriteLine("[Server] A sample server, sequential version ...");
            SequentialServer server = new SequentialServer();
            server.prepareServer();
        }
        public static void concurrentRun()

        {
            Console.Out.WriteLine("[ConcurrentServer] A sample server, concurrent version ...");
            ConcurrentServer server = new ConcurrentServer();
            server.prepareServer();
        }
    }
    class Program
    {
        // Main Method 
        static void Main(string[] args)
        {
            Console.Clear();
           //ServerSimulator.sequentialRun();
            // todo: uncomment this when the solution is ready.
            ServerSimulator.concurrentRun();
        }

    }
}

client.cs:

using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using System.Threading;

namespace SocketClient
{
    public class ClientInfo
    {
        public string studentnr { get; set; }
        public string classname { get; set; }
        public int clientid { get; set; }
        public string teamname { get; set; }
        public string ip { get; set; }
        public string secret { get; set; }
        public string status { get; set; }
    }

    public class Message
    {
        public const string welcome = "WELCOME";
        public const string stopCommunication = "COMC-STOP";
        public const string statusEnd = "STAT-STOP";
        public const string secret = "SECRET";
    }

    public class Client
    {
        public Socket clientSocket;
        private ClientInfo info;
        public IPEndPoint localEndPoint;
        public IPAddress ipAddress = IPAddress.Parse("127.0.0.1");
        public readonly int portNumber = 11111;
        public readonly int minWaitingTime = 50, maxWaitingTime = 100;
        public int waitingTime = 0;
        string baseStdNumber = "0700";

        private String msgToSend;

        public Client(bool finishing, int n)
        {
            waitingTime = new Random().Next(minWaitingTime, maxWaitingTime);
            info = new ClientInfo();
            info.classname = " INF2X ";
            info.studentnr = this.baseStdNumber + n.ToString();
            info.ip = "127.0.0.1";
            info.clientid = finishing ? -1 : 1;
        }

        public string getClientInfo()
        {
            return JsonSerializer.Serialize<ClientInfo>(info);
        }
        public void prepareClient()
        {
            try
            {
                // Establish the remote endpoint for the socket.
                localEndPoint = new IPEndPoint(ipAddress, portNumber);
                // Creation TCP/IP Socket using  
                clientSocket = new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
            }
            catch (Exception e)
            {
                Console.Out.WriteLine("[Client] Preparation failed: {0}", e.Message);
            }
        }
        public string processMessage(string msg)
        {
            Console.WriteLine("[Client] from Server -> {0}", msg);
            string replyMsg = "";

            try
            {
                switch (msg)
                {
                    case Message.welcome:
                        replyMsg = this.getClientInfo();
                        break;
                    default:
                        ClientInfo c = JsonSerializer.Deserialize<ClientInfo>(msg.ToString());
                        if (c.status == Message.statusEnd)
                        {
                            replyMsg = Message.stopCommunication;
                        }
                        break;
                }
            }
            catch (Exception e)
            {
                Console.Out.WriteLine("[Client] processMessage {0}", e.Message);
            }
            return replyMsg;
        }
        public void startCommunication()
        {
            Console.Out.WriteLine("[Client] **************");
            Thread.Sleep(waitingTime);
            // Data buffer 
            byte[] messageReceived = new byte[1024];
            int numBytes = 0;
            String rcvdMsg = null;
            Boolean stop = false;
            string reply = "";

            try
            {
                // Connect Socket to the remote endpoint 
                clientSocket.Connect(localEndPoint);
                // print connected EndPoint information  
                Console.WriteLine("[Client] connected to -> {0} ", clientSocket.RemoteEndPoint.ToString());

                while (!stop)
                {
                    // Receive the messagge using the method Receive().
                    numBytes = clientSocket.Receive(messageReceived);
                    rcvdMsg = Encoding.ASCII.GetString(messageReceived, 0, numBytes);
                    reply = this.processMessage(rcvdMsg);
                    this.sendReply(reply);
                    if (reply.Equals(Message.stopCommunication))
                    {
                        stop = true;
                    }
                }
            }
            catch (Exception e)
            {
                Console.WriteLine(e.Message);
            }
        }
        public void sendReply(string msg)
        {
            // Create the message to send
            Console.Out.WriteLine("[Client] Message to be sent: {0}", msg);
            byte[] messageSent = Encoding.ASCII.GetBytes(msg);
            int byteSent = clientSocket.Send(messageSent);
        }
        public void endCommunication()
        {
            Console.Out.WriteLine("[Client] End of communication to -> {0} ", clientSocket.RemoteEndPoint.ToString());
            clientSocket.Shutdown(SocketShutdown.Both);
            clientSocket.Close();
        }
    }

    public class ClientsSimulator
    {
        private int numberOfClients;
        private Client[] clients;
        public readonly int waitingTimeForStop = 2000;


        public ClientsSimulator(int n, int t)
        {
            numberOfClients = n;
            clients = new Client[numberOfClients];
            for (int i = 0; i < numberOfClients; i++)
            {
                clients[i] = new Client(false, i);
            }
        }

        public void SequentialSimulation()
        {
            while(true){
            Console.Out.WriteLine("\n[ClientSimulator] Sequential simulator is going to start ...");
            for (int i = 0; i < numberOfClients; i++)
            {
                clients[i].prepareClient();
                clients[i].startCommunication();
                clients[i].endCommunication();
            }

            Console.Out.WriteLine("\n[ClientSimulator] All clients finished with their communications ... ");

           Thread.Sleep(waitingTimeForStop);

            Client endClient = new Client(true, -1);
            endClient.prepareClient();
            endClient.startCommunication();
            endClient.endCommunication();
        }
        }

        public void ConcurrentSimulation()
        {
            Console.Out.WriteLine("[ClientSimulator] Concurrent simulator is going to start ...");
            var t = Task.Run(() => SequentialSimulation() );
            var t1 = Task.Run(() => SequentialSimulation() );
            var t2 = Task.Run(() => SequentialSimulation() );
            var t3 = Task.Run(() => SequentialSimulation() );
            var t4= Task.Run(() => SequentialSimulation() );
            var t5 = Task.Run(() => SequentialSimulation() );
            t.Wait();
            t1.Wait();
            t2.Wait();
            t3.Wait();
            t4.Wait();
            t5.Wait();



        }
    }
    class Program
    {
        // Main Method 
        static void Main(string[] args)
        {
            Console.Clear();
            int wt = 5000, nc = 20;
            ClientsSimulator clientsSimulator = new ClientsSimulator(nc, wt);
            //clientsSimulator.SequentialSimulation();
            Thread.Sleep(wt);
            // todo: Uncomment this, after finishing the method.
            clientsSimulator.ConcurrentSimulation();

        }
    }
}

您遇到了一些竞争条件

看看这个:

  public void ConcurrentSimulation()
        {
            Console.Out.WriteLine("[ClientSimulator] Concurrent simulator is going to start ...");
            var t = Task.Run(() => SequentialSimulation() );
            var t1 = Task.Run(() => SequentialSimulation() );
            var t2 = Task.Run(() => SequentialSimulation() );
            var t3 = Task.Run(() => SequentialSimulation() );
            var t4= Task.Run(() => SequentialSimulation() );
            var t5 = Task.Run(() => SequentialSimulation() );
            t.Wait();
            t1.Wait();
            t2.Wait();
            t3.Wait();
            t4.Wait();
            t5.Wait();



        }

你运行SequentialSimulation并发5次;此方法为您为模拟排队的每个客户端提供一个新的 clientSocket Socket 对象,但是您同时 运行 5 次,所以剩下的代码 运行s 用于同一个套接字 5 次(您为每个客户端创建 5 个套接字并且只使用最后一个)

这意味着您在某个时候关闭了套接字,而剩余的线程继续尝试使用底层 clientSocket,您会得到一个错误。

您需要每个 SequentialSimulation 到 运行 都有自己的一组 Client 对象,并且您需要以其他方式确保您在拥有之后不会尝试访问套接字处理掉它。