优雅地关闭 TcpListener,等待现有的 TcpClients 完成

Gracefully shutting down a TcpListener, waiting for existing TcpClients to finish

我正在尝试正常关闭 TCPListener - 意思是,如果有任何客户端已连接,请等待该请求得到服务,然后正常断开连接。

 namespace Server {
    class Program{
        static void Main(string[]args) {
            Console.WriteLine("Starting Server");
            CancellationTokenSource cts = new CancellationTokenSource();
            Server cc = new Server();
            // In production, this will not be a task, but its own thread
            var t = cc.StartListener(cts.Token);
            Console.WriteLine("Server Started - Press any key to exit to stop the listener");
            Console.ReadKey(true);
            Console.WriteLine("\r\nStopping the listener");
            cts.Cancel();
            // Wait for the task (that should be a thread to finish 
            Task[] ts = new Task[1]{ t };
            Task.WaitAll(ts);
            Console.WriteLine("\r\nListener stopped - exiting");
        }
    }

    public class Server{
        public async Task StartListener(CancellationToken cts) {
            tcpListener = new TcpListener(IPAddress.Any, 4321);
            tcpListener.Start();

            Console.WriteLine();
            // Keep accepting clients until the cancellation token
            while (!cts.IsCancellationRequested) {
                var tcpClient = await tcpListener.AcceptTcpClientAsync().ConfigureAwait(false);
                // Increment the count of outstanding clients
                Interlocked.Increment(ref c);
                // When we are done, use a continuation to decrement the count
                ProcessClient(tcpClient).ContinueWith((_t) => Interlocked.Decrement(ref c));
                Console.Write("\b\b\b\b" + c);
            }

            Console.WriteLine($ "\r\nWaiting for {c} connections to finish");
            // Stop the listener
            tcpListener.Stop();

            // Stick around until all clients are done
            while (c > 0) {}
            Console.WriteLine("Done");
        }

        int c = 0;

        public TcpListener tcpListener;

        static Random random = new Random();

        private async Task ProcessClient(TcpClient tcpClient) {
            var ns = tcpClient.GetStream();
            try {
                byte[]b = new byte[16];
                await ns.ReadAsync(b, 0, 16);
                // Introduce a random delay to simulate 'real world' conditions
                await Task.Delay(random.Next(100, 500)); 
                // Write back the payload we receive (should be a guid, i.e 16-bytes)
                await ns.WriteAsync(b, 0, 16);
            } catch (Exception ex) {
                Console.WriteLine(ex.Message);
            }
            finally {
                tcpClient.Close();
            }
        }
    }
}

这是我的客户

namespace client{
    class Program{
        static void Main(string[]args) {
            List<Task> ts = new List<Task>();

            for (int i = 0; i < 5000; i++) {
                var t = Connect(i);
                ts.Add(t);
            }

            Task.WaitAll(ts.ToArray());
            Console.WriteLine("done - exiting, but first \r\n");

            // Group all the messages so they are only output once
            foreach(var m in messages.GroupBy(x => x).Select(x => (x.Count() + " " + x.Key))) {
                Console.WriteLine(m);
            }
        }

        static object o = new Object();

        static Random r = new Random();
        static List <string> messages = new List <string> ();

        static async Task Connect(int i) {
            try {
                // Delay below will simulate requests coming in over time
                await Task.Delay(r.Next(0, 10000));
                TcpClient tcpClient = new TcpClient();
                await tcpClient.ConnectAsync("127.0.0.1", 4321);
                using(var ns = tcpClient.GetStream()) {
                    var g = Guid.NewGuid();
                    // Send a guid
                    var bytes = g.ToByteArray();
                    await ns.WriteAsync(bytes, 0, 16);
                    // Read guid back out
                    var outputBytes = new byte[16];
                    await ns.ReadAsync(outputBytes, 0, 16);
                    // Did we get the right value back?
                    var og = new Guid(outputBytes);
                }
            } catch (Exception ex) {
                lock(o) {
                    var message = ex.Message.Length <= 150 ? ex.Message + "..." : ex.Message.Substring(0, 149);
                    if (messages.IndexOf(message) == -1) {}
                    messages.Add(message);
                }
            }
        }
    }
}

如果我停止服务器,但客户端继续运行,显然,我会得到一堆

No connection could be made because the target machine actively refused it. [::ffff:127.0.0.1]:4321...

这是预期的 - 我不明白的是为什么客户端仍然报告一些连接(很少)被强制关闭。

Unable to read data from the transport connection: An existing connection was forcibly closed by the remote host.....

That's expected - what I don't understand, is why the client still reports some connections (very few) as forcibly closed.

有两件事阻碍了您的期望:您没有在连接上使用优雅的关闭,并且(更重要的是)客户端连接被网络驱动程序代表您接受并放入 "backlog" .

我修改了你的代码示例以显示关键点的当前系统时间:

  • 对于客户端,产生任何给定错误消息的最早时间
  • 对于服务器来说,用户按下某个键的时间,以及服务器认为所有客户端已经成功关闭的时间

我还在服务器中的每个报告之后添加了两秒钟的等待时间,这样监听套接字的实际关闭和进程的关闭都比报告发生得晚得多。我这样做是为了更容易将客户端进程的输出与服务器进程的输出及其操作相关联。第一次延迟还允许积压完全填满,因此更容易看出其影响(见下文)。第二次延迟有助于确保进程本身的关闭不会影响连接的处理方式。

服务器的输出如下所示:

Starting Server

Server Started - Press any key to exit to stop the listener
163
Stopping the listener
156
Waiting for 156 connections to finish (current time is 04:10:27.040)
Done (exiting at 04:10:29.048)

Listener stopped - exiting

客户端输出如下所示:

done - exiting, but first

200: Unable to read data from the transport connection: An existing connection was forcibly closed
by the remote host... (earliest: 04:10:29.047) 2514: No connection could be made because the target machine actively refused it 127.0.0.1:4321...
(earliest: 04:10:29.202)

注意:

  • 客户端套接字因 "unable to read" 失败的最早 时间是服务器首次报告后两秒。 IE。几乎就在服务器实际关闭侦听套接字的时候。
  • 同样,客户端套接字因 "no connection could be made" 失败的最早时间仅比最早的 "unable to read" 错误晚一点。重要的是,它也是 服务器在最后建立的客户端完成后立即显示的消息之后。

边栏:我还对您的代码进行了一些其他修改。特别是,我添加了明确的正常关闭,以将其作为潜在问题消除。事实上,您会注意到使用我的代码版本,您只会遇到读取或连接错误。如果没有优雅的关闭逻辑,偶尔(但很少)一些错误也会是写入错误,这是客户端和服务器之间竞争的结果。

看到读取错误消息的数量恰好是 200 条也很有指导意义。

所以,这是怎么回事?

  • 读取错误是客户端代表服务器的网络驱动程序而非服务器程序本身接受了连接的结果。 IE。这些是监听套接字积压中的连接。据客户端所知,他们的连接已被接受,但实际上服务器进程从未真正接受过。所以网络驱动不得不强行关闭这些连接。

    这也是为什么读取错误的数量恰好是200个很有趣的原因。在过去,[=上的默认(和最大值) 87=] 非服务器 OS 的积压只有 5 个,而服务器 OS 允许积压 200 个。我是 运行 Windows 10 "Pro",因此我推测 Microsoft 也增加了 OS 的 "Pro" 版本的默认积压。我猜想 "Home" 版本仍然具有旧的默认值 5,但我对此可能是错误的。

    请注意,在现实中,由于线程调度和时序问题,实际数量有时可能会略高于实际积压值。但总会很近的。
  • 连接错误是客户端在侦听套接字实际 关闭 后尝试连接的结果。这是您在关闭套接字时所期望的正常情况。

底线:none 与服务器实际完全建立的连接有错误。唯一发生过的错误是在服务器开始关闭所有程序时发生的。发生的确切错误取决于客户端能够进入连接过程的程度。

我会注意到,除了缺少优雅关闭之外,您的代码还有其他一些问题:Random class 不是线程安全的,并且没有 gua 运行请注意读取操作将 return 您期望的字节数(如果有任何数据要接收,读取操作可能 return 只需要一个字节)。

我认为就您提供的概念验证示例而言,这两个其他问题可能无关紧要。 Random 在不安全地使用时不会 return 正确的高斯分布,并且您的代码示例实际上并不关心是否接收到当前字节。我 运行 修复了这些问题的代码版本,它似乎没有影响整体行为,我也没有预料到它。

但归根结底,在处理网络时 I/O,尤其是在客户端和服务器没有紧密协调的情况下(例如,服务器只有在确认没有将请求更多的客户端连接……在现实生活中,这几乎 永远不会 发生 :) ),错误是生活中的事实,代码 must 面对他们要坚强