如何在不使用 Zookeeper 的情况下实现分布式优先级队列?
How to implement a distributed priority queue without using Zookeeper?
我想在不使用 Zookeeper 的情况下实现分布式优先级队列?
如果您知道如何在客户端和服务器之间进行通信(例如使用 TCP 套接字),那应该很简单。服务器包含优先级队列的线程安全实现,因此提供 "interface"。客户端连接到服务器并使用此 "interface".
服务器
服务器必须提供优先级队列接口(即支持add
、peek
、poll
、...)。重要的是这些方法必须是线程安全的!所以我们将使用PriorityBlockingQueue
(同步)而不是PriorityQueue
。
public class Server {
private static ServerSocket server_skt;
public PriorityBlockingQueue<Integer> pq;
// Constructor
Server(int port, int pq_size) {
server_skt = new ServerSocket(port);
this.pq = new PriorityBlockingQueue<Integer>(pq_size);
}
public static void main(String argv[]) {
Server server = new Server(5555, 20); // Make server instance
while(true) {
// Always wait for new clients to connect
try {
System.out.println("Waiting for a client to connect...");
// Spawn new thread for communication with client
new CommunicationThread(server_skt.accept(), server.pq).start();
} catch(IOException e) {
System.out.println("Exception occured :" + e.getStackTrace());
}
}
}
}
这就是 CommunicationThread
class 的样子
public class CommunicationThread extends Thread {
private Socket client_socket;
private InputStream client_in;
private OutputStream client_out;
private PriorityBlockingQueue<Integer> pq;
public CommunicationThread(Socket socket, PriorityBlockingQueue<Integer> pq) {
try {
this.client_socket = socket;
this.client_in = client_socket.getInputStream();
this.client_out = client_socket.getOutputStream();
this.pq = pq;
System.out.println("Client connected : " + client_socket.getInetAddress().toString());
} catch(IOException e) {
System.out.println("Could not initialize communication properly. -- CommunicationThread.\n");
}
}
@Override
public void run() {
boolean active = true;
while(active) {
int message_number = client_in.read(); // Listen for next integer --> dispatch to correct method
switch(message_number) {
case -1: case 0:
// Will stop the communication between client and server
active = false;
break;
case 1:
// Add
int element_to_add = client_in.read(); // read element to add to the priority queue
pq.add(element_to_add); // Note that a real implementation would send the answer back to the client
break;
case 2:
// Poll (no extra argument to read)
int res = pq.poll();
// Write result to client
client_out.write(res);
client_out.flush();
break;
/*
* IMPLEMENT REST OF INTERFACE (don't worry about synchronization, PriorityBlockingQueue methods are already thread safe)
*/
}
}
client_in.close();
client_out.close();
}
}
这个class正在监听客户端发送的内容。
根据客户端发送的内容,服务器知道要做什么,因此有一个迷你协议。该协议是:当客户端想要调用分布式优先级队列的方法时,他发送一个整数(例如 2 = poll()
)。服务器读取该整数并知道调用哪个方法。
请注意,有时发送一个整数就足够了(参见 poll()
示例),但并非总是如此。想想 add()
的例子,它必须指定一个参数。服务器将从客户端接收 1
(即 add()
)并将读取第二个整数(或必须存储在分布式优先级队列中的任何其他对象)。
客户
基于协议,服务器正在为客户端提供一个接口(例如 0 = 停止通信,1 = add()
,...)。客户端只需连接到服务器并向其发送消息(遵守协议!)。
客户端示例:
public class PQ_Client {
private static Socket skt;
private InputStream in;
private OutputStream out;
private final int _STOP_ = 0, _ADD_ = 1, _POLL_ = 2; // By convention (protocol)
PQ_Client(String ip, int port) {
try {
this.skt = new Socket(ip, port);
this.in = skt.getInputStream();
this.out = skt.getOutputStream();
System.out.println("Connected to distributed priority queue.");
} catch(IOException e) {
System.out.println("Could not connect with the distributed priority queue : " + e.getStackTrace());
}
}
// Sort of stub functions
public void stop() {
out.write(_STOP_);
out.flush();
out.close();
}
public void add(Integer el) {
out.write(_ADD_); // Send wanted operation
out.write(el); // Send argument element
// Real implementation would listen for result here
out.flush();
}
public int poll() {
out.write(_POLL_);
out.flush();
// Listen for answer
return in.read();
}
/*
* Rest of implementation
*/
}
请注意,多亏了这些自制的 "stub functions" 我们可以创建一个 PQ_Client
对象并将其用作优先级队列(client/server 通信隐藏在存根后面。
String ip = "...";
int port = 5555;
PQ_Client pq = new PQ_Client(ip , port);
pq.add(5);
pq.add(2);
pq.add(4);
int res = pq.poll();
请注意,通过使用 RPC(远程过程调用)可以更容易(自动生成存根函数,...)。
事实上,我们上面实现的是一个类似于 RPC 的机制,因为它什么都不做,然后发送消息以调用服务器上的过程(例如 add()
),序列化结果(整数不需要),发送它返回给客户端。
我想在不使用 Zookeeper 的情况下实现分布式优先级队列?
如果您知道如何在客户端和服务器之间进行通信(例如使用 TCP 套接字),那应该很简单。服务器包含优先级队列的线程安全实现,因此提供 "interface"。客户端连接到服务器并使用此 "interface".
服务器
服务器必须提供优先级队列接口(即支持add
、peek
、poll
、...)。重要的是这些方法必须是线程安全的!所以我们将使用PriorityBlockingQueue
(同步)而不是PriorityQueue
。
public class Server {
private static ServerSocket server_skt;
public PriorityBlockingQueue<Integer> pq;
// Constructor
Server(int port, int pq_size) {
server_skt = new ServerSocket(port);
this.pq = new PriorityBlockingQueue<Integer>(pq_size);
}
public static void main(String argv[]) {
Server server = new Server(5555, 20); // Make server instance
while(true) {
// Always wait for new clients to connect
try {
System.out.println("Waiting for a client to connect...");
// Spawn new thread for communication with client
new CommunicationThread(server_skt.accept(), server.pq).start();
} catch(IOException e) {
System.out.println("Exception occured :" + e.getStackTrace());
}
}
}
}
这就是 CommunicationThread
class 的样子
public class CommunicationThread extends Thread {
private Socket client_socket;
private InputStream client_in;
private OutputStream client_out;
private PriorityBlockingQueue<Integer> pq;
public CommunicationThread(Socket socket, PriorityBlockingQueue<Integer> pq) {
try {
this.client_socket = socket;
this.client_in = client_socket.getInputStream();
this.client_out = client_socket.getOutputStream();
this.pq = pq;
System.out.println("Client connected : " + client_socket.getInetAddress().toString());
} catch(IOException e) {
System.out.println("Could not initialize communication properly. -- CommunicationThread.\n");
}
}
@Override
public void run() {
boolean active = true;
while(active) {
int message_number = client_in.read(); // Listen for next integer --> dispatch to correct method
switch(message_number) {
case -1: case 0:
// Will stop the communication between client and server
active = false;
break;
case 1:
// Add
int element_to_add = client_in.read(); // read element to add to the priority queue
pq.add(element_to_add); // Note that a real implementation would send the answer back to the client
break;
case 2:
// Poll (no extra argument to read)
int res = pq.poll();
// Write result to client
client_out.write(res);
client_out.flush();
break;
/*
* IMPLEMENT REST OF INTERFACE (don't worry about synchronization, PriorityBlockingQueue methods are already thread safe)
*/
}
}
client_in.close();
client_out.close();
}
}
这个class正在监听客户端发送的内容。
根据客户端发送的内容,服务器知道要做什么,因此有一个迷你协议。该协议是:当客户端想要调用分布式优先级队列的方法时,他发送一个整数(例如 2 = poll()
)。服务器读取该整数并知道调用哪个方法。
请注意,有时发送一个整数就足够了(参见 poll()
示例),但并非总是如此。想想 add()
的例子,它必须指定一个参数。服务器将从客户端接收 1
(即 add()
)并将读取第二个整数(或必须存储在分布式优先级队列中的任何其他对象)。
客户
基于协议,服务器正在为客户端提供一个接口(例如 0 = 停止通信,1 = add()
,...)。客户端只需连接到服务器并向其发送消息(遵守协议!)。
客户端示例:
public class PQ_Client {
private static Socket skt;
private InputStream in;
private OutputStream out;
private final int _STOP_ = 0, _ADD_ = 1, _POLL_ = 2; // By convention (protocol)
PQ_Client(String ip, int port) {
try {
this.skt = new Socket(ip, port);
this.in = skt.getInputStream();
this.out = skt.getOutputStream();
System.out.println("Connected to distributed priority queue.");
} catch(IOException e) {
System.out.println("Could not connect with the distributed priority queue : " + e.getStackTrace());
}
}
// Sort of stub functions
public void stop() {
out.write(_STOP_);
out.flush();
out.close();
}
public void add(Integer el) {
out.write(_ADD_); // Send wanted operation
out.write(el); // Send argument element
// Real implementation would listen for result here
out.flush();
}
public int poll() {
out.write(_POLL_);
out.flush();
// Listen for answer
return in.read();
}
/*
* Rest of implementation
*/
}
请注意,多亏了这些自制的 "stub functions" 我们可以创建一个 PQ_Client
对象并将其用作优先级队列(client/server 通信隐藏在存根后面。
String ip = "...";
int port = 5555;
PQ_Client pq = new PQ_Client(ip , port);
pq.add(5);
pq.add(2);
pq.add(4);
int res = pq.poll();
请注意,通过使用 RPC(远程过程调用)可以更容易(自动生成存根函数,...)。
事实上,我们上面实现的是一个类似于 RPC 的机制,因为它什么都不做,然后发送消息以调用服务器上的过程(例如 add()
),序列化结果(整数不需要),发送它返回给客户端。