Swift-NIO + WebSocket-Kit:在 Mac 应用程序中正确 Setup/Cleanup

Swift-NIO + WebSocket-Kit: Proper Setup/Cleanup in a Mac App

上下文

我正在开发 Mac 应用程序。在这个应用程序中,我想要 运行 一个 websocket 服务器。为此,我使用了 Swift NIO 和 Websocket-Kit。我的完整设置如下。

问题

Websocket-Kit 和 SwiftNIO 的所有文档都旨在创建一个服务器端进程,当您从命令行启动它时启动它,然后 运行s无限。

在我的应用程序中,我必须能够启动 websocket 服务器,然后关闭它并按需重新启动它,而无需重新启动我的应用程序。下面的代码可以做到这一点,但我想确认两件事:

  1. test() 函数中,我向所有连接的客户端发送一些文本。我不确定这是否线程安全且正确。我可以存储 WebSocket 个实例,就像我在这里所做的那样,并从我的应用程序的主线程向它们发送消息吗?

  2. 我是否正确关闭了 websocket 服务器?调用 serverBootstrap(group:)[...].bind(host:port:).wait() 的结果创建一个 Channel,然后无限等待。当我在关联的 EventLoopGroup 上调用 shutdownGracefully() 时,该服务器是否已正确清理? (我可以确认端口 5759 在这次关闭后再次空闲,所以我 猜测 一切都清理干净了吗?)

感谢您的投入;很难找到在应用程序中使用 SwiftNIO 和 Websocket-Kit 的示例。

代码

import Foundation
import NIO
import NIOHTTP1
import NIOWebSocket
import WebSocketKit


@objc class WebsocketServer: NSObject
{
    private var queue: DispatchQueue?
    private var eventLoopGroup: MultiThreadedEventLoopGroup?
    private var websocketClients: [WebSocket] = []
    
    
    @objc func startServer()
    {
        queue = DispatchQueue.init(label: "socketServer")
        queue?.async
        {
            let upgradePipelineHandler: (Channel, HTTPRequestHead) -> EventLoopFuture<Void> = { channel, req in
                
                WebSocket.server(on: channel) { ws in
                    ws.send("You have connected to WebSocket")
                    
                    DispatchQueue.main.async {
                        self.websocketClients.append(ws)
                        print("websocketClients after connection: \(self.websocketClients)")
                    }
                
                    ws.onText { ws, string in
                        print("received")
                        ws.send(string.trimmingCharacters(in: .whitespacesAndNewlines).reversed())
                    }
                
                    ws.onBinary { ws, buffer in
                        print(buffer)
                    }
                
                    ws.onClose.whenSuccess { value in
                        print("onClose")
                        
                        DispatchQueue.main.async
                        {
                            self.websocketClients.removeAll { (socketToTest) -> Bool in
                                return socketToTest === ws
                            }
                            
                            print("websocketClients after close: \(self.websocketClients)")
                        }
                    }
                }
            }

            self.eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2)
            let port: Int = 5759

            let promise = self.eventLoopGroup!.next().makePromise(of: String.self)
            
            let server = try? ServerBootstrap(group: self.eventLoopGroup!)
                
                // Specify backlog and enable SO_REUSEADDR for the server itself
                .serverChannelOption(ChannelOptions.backlog, value: 256)
                .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
                
                .childChannelInitializer { channel in
              
                let webSocket = NIOWebSocketServerUpgrader(
                    shouldUpgrade: { channel, req in
                        return channel.eventLoop.makeSucceededFuture([:])
                    },
                    upgradePipelineHandler: upgradePipelineHandler
                )
              
                return channel.pipeline.configureHTTPServerPipeline(
                    withServerUpgrade: (
                        upgraders: [webSocket],
                        completionHandler: { ctx in
                            // complete
                        })
                )
            }.bind(host: "0.0.0.0", port: port).wait()                  

            _ = try! promise.futureResult.wait()
        }
    }
    
    
    
    ///
    ///  Send a message to connected clients, then shut down the server.
    ///
    @objc func test()
    {
        self.websocketClients.forEach { (ws) in
            ws.eventLoop.execute {
                ws.send("This is a message being sent to all websockets.")
            }
        }
        
        stopServer()
    }
    
    
    
    @objc func stopServer()
    {
        self.websocketClients.forEach { (ws) in
            try? ws.eventLoop.submit { () -> Void in
                print("closing websocket: \(ws)")
                _ = ws.close()
            }.wait()                        // Block until complete so we don't shut down the eventLoop before all clients get closed.
        }
        
        eventLoopGroup?.shutdownGracefully(queue: .main, { (error: Error?) in

            print("Eventloop shutdown now complete.")
            self.eventLoopGroup = nil
            self.queue = nil
        })
    }
}

In the test() function, I send some text to all connected clients. I am unsure if this is thread-safe and correct. Can I store the WebSocket instances as I'm doing here and message them from the main thread of my application?

正如您在这里所做的那样,是的,那应该是安全的。 ws.eventLoop.execute 将在属于该 WebSocket 连接的事件循环线程上执行该块。这样就安全了。

When I call shutdownGracefully() on the associated EventLoopGroup, is that server cleaned up correctly? (I can confirm that port 5759 is free again after this shutdown, so I'm guessing everything is cleaned up?)

是的。 shutdownGracefully 强制关闭所有连接和侦听套接字。