具有两个事件循环的异步 Web 套接字应用程序服务器

Asynchronous web socket application server with two event loops

我正在尝试制作一个使用 websockets 作为其主要界面的分布式 RPC 类型的 Web 应用程序。我想使用排队系统(如 RabbitMQ)来分配通过 websocket 连接请求的昂贵作业。


  1. 客户端通过 websocket 连接向服务器发送作业
  2. 服务器会将此消息发送到 RabbitMQ 交换器以供工作人员处理
  3. 工作人员将执行作业并将作业的结果添加到响应队列
  4. 服务器会检查响应队列并通过 websocket 连接将作业结果发送回客户端。

据我所知,在服务器上我需要两个共享内存的事件循环。 Websocket 服务器需要监听传入的作业,RabbitMQ 消费者需要监听作业结果以发送回客户端。


我对 websockets 和分布式计算都不熟悉,所以我真的不知道其中哪一个(或者可能是我没想到的)最适合我。

由于您可以让多个客户端同时发送作业,因此您需要一个多线程服务器。除非您的应用程序会为每个客户处理客户。现在有多种实现多线程服务器的方法,每种方法都有自己的 advantages/disadvantages。通过 :

  1. 每个请求一个线程(+:吞吐量可能最大化,-:线程创建成本高,必须管理并发)
  2. 每个客户端一个线程(+:线程管理开销更少,-:不能扩展到很多连接,仍然管理并发)
  3. 一个线程池(+:避免创建线程的开销,可扩展至 N 个并发连接(N = 线程池的大小),- : 管理N个线程之间的并发)

您可以选择上述方法之一 (我会选择每个客户端一个线程,因为它相对容易实现,而且您有可能拥有数以万计的客户端比较小).

请注意,这是一种多线程方法,不是事件驱动方法!但是因为你不限于一个线程(在这种情况下它应该是事件驱动的以便能够处理多个客户端“并发”)我不会选择那个选项实施起来比较困难。 (程序员有时会谈论事件驱动方法中的“回调地狱”)。

这就是我的实现方式(每个客户端一个线程,Java) :

public class Server {
    private static ServerSocket server_skt;
    private static ... channel; // channel to communicate with the rabbitMQ distributed priority queue.

    // Constructor
    Server(int port) {
        server_skt = new ServerSocket(port);
         * Set up connection with the distributed queue
         * channel = ...;

    public static void main(String argv[]) {
        Server server = new Server(5555); // Make server instance

        while(true) {
            // Always waiting for new clients to connect
            try {
                System.out.println("Waiting for a client to connect...");
                // Spawn new thread for communication with client (hence one thread per client approach)
                new CommunicationThread(server_skt.accept(), server.channel).start(); // Will listen for new jobs and send them
            } catch(IOException e) {
                System.out.println("Exception occured :" + e.getStackTrace());
public class CommunicationThread extends Thread {
    private Socket client_socket;
    private InputStream client_in;
    private OutputStream client_out;
    private ... channel; // Channel to communicate with rabbitMQ
    private ... resultQueue;

    public CommunicationThread(Socket socket, ... channel) { // replace ... by type of the rabbitMQ channel
        try {
            this.client_socket = socket;
            this.client_in = client_socket.getInputStream();
            this.client_out = client_socket.getOutputStream(); 
            this.channel = channel;
            this.resultQueue = ...;

            System.out.println("Client connected : " + client_socket.getInetAddress().toString());
        } catch(IOException e) {
            System.out.println("Could not initialize communication properly. -- CommunicationThread.\n");
    public yourJobType readJob() {
        // Read input from client (e.g. read a String from "client_in")
        // Make a job from it (e.g. map String to a job)
        // return the job

    public void run() {
        while(active) {
             * Always listen for incoming jobs (sent by client) and for results (to be sent back to client)
            // Read client input (only if available, else it would be blocking!)
            if(client_in.available() > 0) {
                yourJobType job = readJob();
                channel.basicPublish(...); // Send job to rabbitMQ
            /* Check result queue (THIS is why reading client input MUST be NON-BLOCKING! Else while loop could be blocked on reading input
             * and the result queue won't be checked until next job arrives)
            ResultType next_result = resultQueue.poll(); // Could be "null" if the queue is empty
            if(next_result != null) {
                // There is a result



要解决此问题,您可以 poll() 使用过滤器和通配符 (*) 的结果队列,或者为每个客户端设置一个结果队列,从而知道从我们的队列中检索到的结果将被发送到对应客户端。

(*) :您可以为每个客户端分配一个 ID。从客户端接收作业时,将作业与客户端 ID 配对(例如在元组 < clientID, job > 中)并将其放入队列中。并对结果执行相同操作(将结果与客户端 ID 配对并将其放入结果队列中)。然后在 CommunicationThreadrun() 方法中,您将只需要轮询结果队列以获得 < clientID, ? >.

重要提示:您还必须为每个作业分配一个 ID!因为发送作业 A 然后发送作业 B 并不能保证作业 A 的结果会在作业 B 的结果之前出现。(作业 B 可能比作业 A 耗时更少,因此结果可以在作业之前发送回客户端A的成绩).

(PS : 看你怎么实现worker了(由server一个线程一个worker执行?还是其他进程执行?))

以上答案是一种可能的多线程解决方案。我只讨论了服务器部分,客户端部分应该发送作业并等待结果(如何实现这个取决于你的目标,客户端是否先发送 all 作业然后接收每个作业的结果工作还是可以混合使用?)。
