创建一个每秒能够处理数千个请求的 TCP 套接字服务器

create a TCP socket server which is able to handle thousands of requests per second

第一次尝试时,我创建了一个基本的 TCP 服务器:

public class Tcp
{
    private  TcpListener listener { get; set; }
    private  bool accept { get; set; } = false;

    public  void StartServer(string ip, int port)
    {
        IPAddress address = IPAddress.Parse(ip);
        listener = new TcpListener(address, port);

        listener.Start();
        accept = true;
        StartListener();
        Console.WriteLine($"Server started. Listening to TCP clients at {ip}:{port}");
    }
    public async void StartListener() //non blocking listener
    {

        listener.Start();
        while (true)
        {
            try
            {
                TcpClient client = await listener.AcceptTcpClientAsync().ConfigureAwait(false);
                HandleClient(client);
            }
            finally { }
        }
    }
    private void HandleClient(TcpClient client)
    {
        try
        {

            NetworkStream networkStream = client.GetStream();
            byte[] bytesFrom = new byte[20];
            networkStream.Read(bytesFrom, 0, 20);
            string dataFromClient = System.Text.Encoding.ASCII.GetString(bytesFrom);
            string serverResponse = "Received!";
            Byte[] sendBytes = Encoding.ASCII.GetBytes(serverResponse);
            networkStream.Write(sendBytes, 0, sendBytes.Length);
            networkStream.Flush();
        }
        catch(Exception ex)
        {
        }
    }
}

我编写了一个客户端测试代码,用于发送和记录每秒的请求数

public class Program
{
    private volatile static Dictionary<int, int> connections = new Dictionary<int, int>();
    private volatile static int fail = 0;
    private static string message = "";
    public static void Main(string[] args)
    {
        ServicePointManager.DefaultConnectionLimit = 1000000;
        ServicePointManager.Expect100Continue = false;
        for (int i = 0; i < 512; i++)
        {
            message += "T";
        }

        int taskCount = 10;
        int requestsCount = 1000;
        var taskList = new List<Task>();
        int seconds = 0;
        Console.WriteLine($"start : {DateTime.Now.ToString("mm:ss")} ");

        for (int i = 0; i < taskCount; i++)
        {

            taskList.Add(Task.Factory.StartNew(() =>
            {
                for (int j = 0; j < requestsCount; j++)
                {
                    Send();
                }
            }));
        }
        Console.WriteLine($"threads stablished : {DateTime.Now.ToString("mm: ss")}");
        while (taskList.Any(t => !t.IsCompleted)) { Thread.Sleep(5000); }
        Console.WriteLine($"Compelete : {DateTime.Now.ToString("mm: ss")}");
        int total = 0;
        foreach (KeyValuePair<int, int> keyValuePair in connections)
        {
            Console.WriteLine($"{keyValuePair.Key}:{keyValuePair.Value}");
            total += keyValuePair.Value;
            seconds++;
        }
        Console.WriteLine($"succeded:{total}\tfail:{fail}\tseconds:{seconds}");
        Console.WriteLine($"End : {DateTime.Now.ToString("mm: ss")}");
        Console.ReadKey();
    }

    private static void Send()
    {
        try
        {
            TcpClient tcpclnt = new TcpClient();
            tcpclnt.ConnectAsync("192.168.1.21", 5678).Wait();
            String str = message;
            Stream stm = tcpclnt.GetStream();

            ASCIIEncoding asen = new ASCIIEncoding();
            byte[] ba = asen.GetBytes(str);

            stm.Write(ba, 0, ba.Length);

            byte[] bb = new byte[100];
            int k = stm.Read(bb, 0, 100);
            tcpclnt.Close();
            lock (connections)
            {
                int key = int.Parse(DateTime.Now.ToString("hhmmss"));
                if (!connections.ContainsKey(key))
                {
                    connections.Add(key, 0);
                }
                connections[key] = connections[key] + 1;
            }
        }
        catch (Exception e)
        {
            lock (connections)
            {
                fail += 1;
            }
        }
    }
}

当我在本地机器上测试时,我得到每秒 4000 个请求的最大数量,当我将它上传到本地局域网时,它减少到每秒 200 个请求。

问题是: 如何提高服务器性能? 负载测试套接字服务器的正确方法是什么?

您可能有一个“非阻塞侦听器”,但是当任何特定的客户端连接时,它只专注于该客户端,直到该客户端发送消息并向其发回响应。这不会很好地扩展。

我通常不喜欢 async void,但它符合您当前的代码:

public async void StartListener() //non blocking listener
{

    listener.Start();
    while (true)
    {
        TcpClient client = await listener.AcceptTcpClientAsync().ConfigureAwait(false);
        HandleClient(client);
    }
}
private async void HandleClient(TcpClient client)
{
    NetworkStream networkStream = client.GetStream();
    byte[] bytesFrom = new byte[20];
    int totalRead = 0;
    while(totalRead<20)
    {
        totalRead += await networkStream.ReadAsync(bytesFrom, totalRead, 20-totalRead).ConfigureAwait(false);
    }
    string dataFromClient = System.Text.Encoding.ASCII.GetString(bytesFrom);
    string serverResponse = "Received!";
    Byte[] sendBytes = Encoding.ASCII.GetBytes(serverResponse);
    await networkStream.WriteAsync(sendBytes, 0, sendBytes.Length).ConfigureAwait(false);
    networkStream.Flush(); /* Not sure necessary */
}

我还修复了我在评论中提到的关于从 Read 中忽略 return 值的错误,并删除了“对我隐藏错误,使错误无法在野外发现”错误处理。

如果您不能保证您的客户端将始终向此代码发送 20 字节的消息,那么您需要执行其他操作,以便服务器知道要读取多少数据。这通常是通过在消息前加上长度前缀或使用某种形式的标记值来指示结束来完成的。请注意,即使使用长度前缀,您也不能保证一次读取整个 length,因此您还需要如上所述使用读取循环来发现长度优先。


如果将所有内容切换到 async 并不能提供您需要的规模,那么您需要放弃使用 NetworkStream 并开始在 Socket 级别工作,特别是设计用于 SocketAsyncEventArgs:

的异步方法

The SocketAsyncEventArgs class is part of a set of enhancements to the System.Net.Sockets.Socket class that provide an alternative asynchronous pattern that can be used by specialized high-performance socket applications... An application can use the enhanced asynchronous pattern exclusively or only in targeted hot areas (for example, when receiving large amounts of data).

The main feature of these enhancements is the avoidance of the repeated allocation and synchronization of objects during high-volume asynchronous socket I/O...

In the new System.Net.Sockets.Socket class enhancements, asynchronous socket operations are described by reusable SocketAsyncEventArgs objects allocated and maintained by the application...