具有两个事件循环的异步 Web 套接字应用程序服务器

Asynchronous web socket application server with two event loops

我正在尝试制作一个使用 websockets 作为其主要界面的分布式 RPC 类型的 Web 应用程序。我想使用排队系统(如 RabbitMQ)来分配通过 websocket 连接请求的昂贵作业。

基本上,流程是这样的:

  1. 客户端通过 websocket 连接向服务器发送作业
  2. 服务器会将此消息发送到 RabbitMQ 交换器以供工作人员处理
  3. 工作人员将执行作业并将作业的结果添加到响应队列
  4. 服务器会检查响应队列并通过 websocket 连接将作业结果发送回客户端。

据我所知,在服务器上我需要两个共享内存的事件循环。 Websocket 服务器需要监听传入的作业,RabbitMQ 消费者需要监听作业结果以发送回客户端。

适合我在这里使用的技术是什么?我考虑了以下几点:

我对 websockets 和分布式计算都不熟悉,所以我真的不知道其中哪一个(或者可能是我没想到的)最适合我。

As far as I can tell, on the server I need two event loops that share memory. The websocket server needs to be listening for incoming jobs, and a RabbitMQ consumer needs to be listening for job results to send back to the clients.

由于您可以让多个客户端同时发送作业,因此您需要一个多线程服务器。除非您的应用程序会为每个客户处理客户。现在有多种实现多线程服务器的方法,每种方法都有自己的 advantages/disadvantages。通过 :

看一下多线程
  1. 每个请求一个线程(+:吞吐量可能最大化,-:线程创建成本高,必须管理并发)
  2. 每个客户端一个线程(+:线程管理开销更少,-:不能扩展到很多连接,仍然管理并发)
  3. 一个线程池(+:避免创建线程的开销,可扩展至 N 个并发连接(N = 线程池的大小),- : 管理N个线程之间的并发)

您可以选择上述方法之一 (我会选择每个客户端一个线程,因为它相对容易实现,而且您有可能拥有数以万计的客户端比较小).

请注意,这是一种多线程方法,不是事件驱动方法!但是因为你不限于一个线程(在这种情况下它应该是事件驱动的以便能够处理多个客户端“并发”)我不会选择那个选项实施起来比较困难。 (程序员有时会谈论事件驱动方法中的“回调地狱”)。

这就是我的实现方式(每个客户端一个线程,Java) :

Basically, the flow would go like this:

  1. A client sends a job via websocket connection to the server

服务器部分:

public class Server {
    private static ServerSocket server_skt;
    private static ... channel; // channel to communicate with the rabbitMQ distributed priority queue.

    // Constructor
    Server(int port) {
        server_skt = new ServerSocket(port);
        
        /*
         * Set up connection with the distributed queue
         * channel = ...;
         */
    }

    public static void main(String argv[]) {
        Server server = new Server(5555); // Make server instance

        while(true) {
            // Always waiting for new clients to connect
            try {
                System.out.println("Waiting for a client to connect...");
                // Spawn new thread for communication with client (hence one thread per client approach)
                new CommunicationThread(server_skt.accept(), server.channel).start(); // Will listen for new jobs and send them
            } catch(IOException e) {
                System.out.println("Exception occured :" + e.getStackTrace());
            }
        }
    }
}
  1. The server would send this message to a RabbitMQ exchange to be processed by a worker
  2. ...
  3. The server would check the response queue and send the result of the job back to the client via websocket connection.
public class CommunicationThread extends Thread {
    private Socket client_socket;
    private InputStream client_in;
    private OutputStream client_out;
    private ... channel; // Channel to communicate with rabbitMQ
    private ... resultQueue;

    public CommunicationThread(Socket socket, ... channel) { // replace ... by type of the rabbitMQ channel
        try {
            this.client_socket = socket;
            this.client_in = client_socket.getInputStream();
            this.client_out = client_socket.getOutputStream(); 
            this.channel = channel;
            this.resultQueue = ...;

            System.out.println("Client connected : " + client_socket.getInetAddress().toString());
        } catch(IOException e) {
            System.out.println("Could not initialize communication properly. -- CommunicationThread.\n");
        }
    }
    
    public yourJobType readJob() {
        // Read input from client (e.g. read a String from "client_in")
        // Make a job from it (e.g. map String to a job)
        // return the job
    }

    @Override
    public void run() {
        while(active) {
            
            /*
             * Always listen for incoming jobs (sent by client) and for results (to be sent back to client)
             */
            
            // Read client input (only if available, else it would be blocking!)
            if(client_in.available() > 0) {
                yourJobType job = readJob();
                channel.basicPublish(...); // Send job to rabbitMQ
            }
            
            /* Check result queue (THIS is why reading client input MUST be NON-BLOCKING! Else while loop could be blocked on reading input
             * and the result queue won't be checked until next job arrives)
             */
            
            ResultType next_result = resultQueue.poll(); // Could be "null" if the queue is empty
            if(next_result != null) {
                // There is a result
                client_out.write(next_result.toByteArray());
                client_out.flush();
            }
        }
        
        client_in.close();
        client_out.close();
    }
}

注意从结果队列读取时,重要的是您只读取该客户端发送的作业结果。

如果您有一个包含作业结果的结果队列(所有客户端的)并且您检索了如上面代码中的结果,那么该结果可能是另一个客户端作业的结果,因此发送结果回错客户了。

要解决此问题,您可以 poll() 使用过滤器和通配符 (*) 的结果队列,或者为每个客户端设置一个结果队列,从而知道从我们的队列中检索到的结果将被发送到对应客户端。

(*) :您可以为每个客户端分配一个 ID。从客户端接收作业时,将作业与客户端 ID 配对(例如在元组 < clientID, job > 中)并将其放入队列中。并对结果执行相同操作(将结果与客户端 ID 配对并将其放入结果队列中)。然后在 CommunicationThreadrun() 方法中,您将只需要轮询结果队列以获得 < clientID, ? >.

重要提示:您还必须为每个作业分配一个 ID!因为发送作业 A 然后发送作业 B 并不能保证作业 A 的结果会在作业 B 的结果之前出现。(作业 B 可能比作业 A 耗时更少,因此结果可以在作业之前发送回客户端A的成绩).

(PS : 看你怎么实现worker了(由server一个线程一个worker执行?还是其他进程执行?))


以上答案是一种可能的多线程解决方案。我只讨论了服务器部分,客户端部分应该发送作业并等待结果(如何实现这个取决于你的目标,客户端是否先发送 all 作业然后接收每个作业的结果工作还是可以混合使用?)。

还有其他方法可以实现它,但对于分布式计算的初学者来说,我认为这是最简单的解决方案(使用线程池,...会使它变得更棘手)。