具有两个事件循环的异步 Web 套接字应用程序服务器
Asynchronous web socket application server with two event loops
我正在尝试制作一个使用 websockets 作为其主要界面的分布式 RPC 类型的 Web 应用程序。我想使用排队系统(如 RabbitMQ)来分配通过 websocket 连接请求的昂贵作业。
基本上,流程是这样的:
- 客户端通过 websocket 连接向服务器发送作业
- 服务器会将此消息发送到 RabbitMQ 交换器以供工作人员处理
- 工作人员将执行作业并将作业的结果添加到响应队列
- 服务器会检查响应队列并通过 websocket 连接将作业结果发送回客户端。
据我所知,在服务器上我需要两个共享内存的事件循环。 Websocket 服务器需要监听传入的作业,RabbitMQ 消费者需要监听作业结果以发送回客户端。
适合我在这里使用的技术是什么?我考虑了以下几点:
- 多线程应用程序并在每个线程上启动一个事件循环
- 通过 shm(共享内存)使用两个进程
- 使用两个通过套接字进行通信的进程(可以是 unix 套接字,甚至可以将工作人员设置为特殊的 websocket 客户端)
- 连接到 websocket 服务器的事件循环以检查结果队列
我对 websockets 和分布式计算都不熟悉,所以我真的不知道其中哪一个(或者可能是我没想到的)最适合我。
As far as I can tell, on the server I need two event loops that share memory. The websocket server needs to be listening for incoming jobs, and a RabbitMQ consumer needs to be listening for job results to send back to the clients.
由于您可以让多个客户端同时发送作业,因此您需要一个多线程服务器。除非您的应用程序会为每个客户处理客户。现在有多种实现多线程服务器的方法,每种方法都有自己的 advantages/disadvantages。通过 :
看一下多线程
- 每个请求一个线程(+:吞吐量可能最大化,-:线程创建成本高,必须管理并发)
- 每个客户端一个线程(+:线程管理开销更少,-:不能扩展到很多连接,仍然管理并发)
- 一个线程池(+:避免创建线程的开销,可扩展至 N 个并发连接(N = 线程池的大小),- : 管理N个线程之间的并发)
您可以选择上述方法之一 (我会选择每个客户端一个线程,因为它相对容易实现,而且您有可能拥有数以万计的客户端比较小).
请注意,这是一种多线程方法,不是事件驱动方法!但是因为你不限于一个线程(在这种情况下它应该是事件驱动的以便能够处理多个客户端“并发”)我不会选择那个选项实施起来比较困难。 (程序员有时会谈论事件驱动方法中的“回调地狱”)。
这就是我的实现方式(每个客户端一个线程,Java) :
Basically, the flow would go like this:
- A client sends a job via websocket connection to the server
服务器部分:
public class Server {
private static ServerSocket server_skt;
private static ... channel; // channel to communicate with the rabbitMQ distributed priority queue.
// Constructor
Server(int port) {
server_skt = new ServerSocket(port);
/*
* Set up connection with the distributed queue
* channel = ...;
*/
}
public static void main(String argv[]) {
Server server = new Server(5555); // Make server instance
while(true) {
// Always waiting for new clients to connect
try {
System.out.println("Waiting for a client to connect...");
// Spawn new thread for communication with client (hence one thread per client approach)
new CommunicationThread(server_skt.accept(), server.channel).start(); // Will listen for new jobs and send them
} catch(IOException e) {
System.out.println("Exception occured :" + e.getStackTrace());
}
}
}
}
- The server would send this message to a RabbitMQ exchange to be processed by a worker
- ...
- The server would check the response queue and send the result of the job back to the client via websocket connection.
public class CommunicationThread extends Thread {
private Socket client_socket;
private InputStream client_in;
private OutputStream client_out;
private ... channel; // Channel to communicate with rabbitMQ
private ... resultQueue;
public CommunicationThread(Socket socket, ... channel) { // replace ... by type of the rabbitMQ channel
try {
this.client_socket = socket;
this.client_in = client_socket.getInputStream();
this.client_out = client_socket.getOutputStream();
this.channel = channel;
this.resultQueue = ...;
System.out.println("Client connected : " + client_socket.getInetAddress().toString());
} catch(IOException e) {
System.out.println("Could not initialize communication properly. -- CommunicationThread.\n");
}
}
public yourJobType readJob() {
// Read input from client (e.g. read a String from "client_in")
// Make a job from it (e.g. map String to a job)
// return the job
}
@Override
public void run() {
while(active) {
/*
* Always listen for incoming jobs (sent by client) and for results (to be sent back to client)
*/
// Read client input (only if available, else it would be blocking!)
if(client_in.available() > 0) {
yourJobType job = readJob();
channel.basicPublish(...); // Send job to rabbitMQ
}
/* Check result queue (THIS is why reading client input MUST be NON-BLOCKING! Else while loop could be blocked on reading input
* and the result queue won't be checked until next job arrives)
*/
ResultType next_result = resultQueue.poll(); // Could be "null" if the queue is empty
if(next_result != null) {
// There is a result
client_out.write(next_result.toByteArray());
client_out.flush();
}
}
client_in.close();
client_out.close();
}
}
注意从结果队列读取时,重要的是您只读取该客户端发送的作业结果。
如果您有一个包含作业结果的结果队列(所有客户端的)并且您检索了如上面代码中的结果,那么该结果可能是另一个客户端作业的结果,因此发送结果回错客户了。
要解决此问题,您可以 poll()
使用过滤器和通配符 (*) 的结果队列,或者为每个客户端设置一个结果队列,从而知道从我们的队列中检索到的结果将被发送到对应客户端。
(*) :您可以为每个客户端分配一个 ID。从客户端接收作业时,将作业与客户端 ID 配对(例如在元组 < clientID, job > 中)并将其放入队列中。并对结果执行相同操作(将结果与客户端 ID 配对并将其放入结果队列中)。然后在 CommunicationThread
的 run()
方法中,您将只需要轮询结果队列以获得 < clientID, ? >.
重要提示:您还必须为每个作业分配一个 ID!因为发送作业 A 然后发送作业 B 并不能保证作业 A 的结果会在作业 B 的结果之前出现。(作业 B 可能比作业 A 耗时更少,因此结果可以在作业之前发送回客户端A的成绩).
(PS : 看你怎么实现worker了(由server一个线程一个worker执行?还是其他进程执行?))
以上答案是一种可能的多线程解决方案。我只讨论了服务器部分,客户端部分应该发送作业并等待结果(如何实现这个取决于你的目标,客户端是否先发送 all 作业然后接收每个作业的结果工作还是可以混合使用?)。
还有其他方法可以实现它,但对于分布式计算的初学者来说,我认为这是最简单的解决方案(使用线程池,...会使它变得更棘手)。
我正在尝试制作一个使用 websockets 作为其主要界面的分布式 RPC 类型的 Web 应用程序。我想使用排队系统(如 RabbitMQ)来分配通过 websocket 连接请求的昂贵作业。
基本上,流程是这样的:
- 客户端通过 websocket 连接向服务器发送作业
- 服务器会将此消息发送到 RabbitMQ 交换器以供工作人员处理
- 工作人员将执行作业并将作业的结果添加到响应队列
- 服务器会检查响应队列并通过 websocket 连接将作业结果发送回客户端。
据我所知,在服务器上我需要两个共享内存的事件循环。 Websocket 服务器需要监听传入的作业,RabbitMQ 消费者需要监听作业结果以发送回客户端。
适合我在这里使用的技术是什么?我考虑了以下几点:
- 多线程应用程序并在每个线程上启动一个事件循环
- 通过 shm(共享内存)使用两个进程
- 使用两个通过套接字进行通信的进程(可以是 unix 套接字,甚至可以将工作人员设置为特殊的 websocket 客户端)
- 连接到 websocket 服务器的事件循环以检查结果队列
我对 websockets 和分布式计算都不熟悉,所以我真的不知道其中哪一个(或者可能是我没想到的)最适合我。
As far as I can tell, on the server I need two event loops that share memory. The websocket server needs to be listening for incoming jobs, and a RabbitMQ consumer needs to be listening for job results to send back to the clients.
由于您可以让多个客户端同时发送作业,因此您需要一个多线程服务器。除非您的应用程序会为每个客户处理客户。现在有多种实现多线程服务器的方法,每种方法都有自己的 advantages/disadvantages。通过 :
看一下多线程- 每个请求一个线程(+:吞吐量可能最大化,-:线程创建成本高,必须管理并发)
- 每个客户端一个线程(+:线程管理开销更少,-:不能扩展到很多连接,仍然管理并发)
- 一个线程池(+:避免创建线程的开销,可扩展至 N 个并发连接(N = 线程池的大小),- : 管理N个线程之间的并发)
您可以选择上述方法之一 (我会选择每个客户端一个线程,因为它相对容易实现,而且您有可能拥有数以万计的客户端比较小).
请注意,这是一种多线程方法,不是事件驱动方法!但是因为你不限于一个线程(在这种情况下它应该是事件驱动的以便能够处理多个客户端“并发”)我不会选择那个选项实施起来比较困难。 (程序员有时会谈论事件驱动方法中的“回调地狱”)。
这就是我的实现方式(每个客户端一个线程,Java) :
Basically, the flow would go like this:
- A client sends a job via websocket connection to the server
服务器部分:
public class Server {
private static ServerSocket server_skt;
private static ... channel; // channel to communicate with the rabbitMQ distributed priority queue.
// Constructor
Server(int port) {
server_skt = new ServerSocket(port);
/*
* Set up connection with the distributed queue
* channel = ...;
*/
}
public static void main(String argv[]) {
Server server = new Server(5555); // Make server instance
while(true) {
// Always waiting for new clients to connect
try {
System.out.println("Waiting for a client to connect...");
// Spawn new thread for communication with client (hence one thread per client approach)
new CommunicationThread(server_skt.accept(), server.channel).start(); // Will listen for new jobs and send them
} catch(IOException e) {
System.out.println("Exception occured :" + e.getStackTrace());
}
}
}
}
- The server would send this message to a RabbitMQ exchange to be processed by a worker
- ...
- The server would check the response queue and send the result of the job back to the client via websocket connection.
public class CommunicationThread extends Thread {
private Socket client_socket;
private InputStream client_in;
private OutputStream client_out;
private ... channel; // Channel to communicate with rabbitMQ
private ... resultQueue;
public CommunicationThread(Socket socket, ... channel) { // replace ... by type of the rabbitMQ channel
try {
this.client_socket = socket;
this.client_in = client_socket.getInputStream();
this.client_out = client_socket.getOutputStream();
this.channel = channel;
this.resultQueue = ...;
System.out.println("Client connected : " + client_socket.getInetAddress().toString());
} catch(IOException e) {
System.out.println("Could not initialize communication properly. -- CommunicationThread.\n");
}
}
public yourJobType readJob() {
// Read input from client (e.g. read a String from "client_in")
// Make a job from it (e.g. map String to a job)
// return the job
}
@Override
public void run() {
while(active) {
/*
* Always listen for incoming jobs (sent by client) and for results (to be sent back to client)
*/
// Read client input (only if available, else it would be blocking!)
if(client_in.available() > 0) {
yourJobType job = readJob();
channel.basicPublish(...); // Send job to rabbitMQ
}
/* Check result queue (THIS is why reading client input MUST be NON-BLOCKING! Else while loop could be blocked on reading input
* and the result queue won't be checked until next job arrives)
*/
ResultType next_result = resultQueue.poll(); // Could be "null" if the queue is empty
if(next_result != null) {
// There is a result
client_out.write(next_result.toByteArray());
client_out.flush();
}
}
client_in.close();
client_out.close();
}
}
注意从结果队列读取时,重要的是您只读取该客户端发送的作业结果。
如果您有一个包含作业结果的结果队列(所有客户端的)并且您检索了如上面代码中的结果,那么该结果可能是另一个客户端作业的结果,因此发送结果回错客户了。
要解决此问题,您可以 poll()
使用过滤器和通配符 (*) 的结果队列,或者为每个客户端设置一个结果队列,从而知道从我们的队列中检索到的结果将被发送到对应客户端。
(*) :您可以为每个客户端分配一个 ID。从客户端接收作业时,将作业与客户端 ID 配对(例如在元组 < clientID, job > 中)并将其放入队列中。并对结果执行相同操作(将结果与客户端 ID 配对并将其放入结果队列中)。然后在 CommunicationThread
的 run()
方法中,您将只需要轮询结果队列以获得 < clientID, ? >.
重要提示:您还必须为每个作业分配一个 ID!因为发送作业 A 然后发送作业 B 并不能保证作业 A 的结果会在作业 B 的结果之前出现。(作业 B 可能比作业 A 耗时更少,因此结果可以在作业之前发送回客户端A的成绩).
(PS : 看你怎么实现worker了(由server一个线程一个worker执行?还是其他进程执行?))
以上答案是一种可能的多线程解决方案。我只讨论了服务器部分,客户端部分应该发送作业并等待结果(如何实现这个取决于你的目标,客户端是否先发送 all 作业然后接收每个作业的结果工作还是可以混合使用?)。
还有其他方法可以实现它,但对于分布式计算的初学者来说,我认为这是最简单的解决方案(使用线程池,...会使它变得更棘手)。