如何在不使用 Zookeeper 的情况下实现分布式优先级队列?

How to implement a distributed priority queue without using Zookeeper?

我想在不使用 Zookeeper 的情况下实现分布式优先级队列?

如果您知道如何在客户端和服务器之间进行通信(例如使用 TCP 套接字),那应该很简单。服务器包含优先级队列的线程安全实现,因此提供 "interface"。客户端连接到服务器并使用此 "interface".

服务器

服务器必须提供优先级队列接口(即支持addpeekpoll、...)。重要的是这些方法必须是线程安全的!所以我们将使用PriorityBlockingQueue(同步)而不是PriorityQueue

public class Server {
    private static ServerSocket server_skt;
    public PriorityBlockingQueue<Integer> pq;

    // Constructor
    Server(int port, int pq_size) {
        server_skt = new ServerSocket(port);
        this.pq = new PriorityBlockingQueue<Integer>(pq_size);
    }

    public static void main(String argv[]) {
        Server server = new Server(5555, 20); // Make server instance

        while(true) {
            // Always wait for new clients to connect
            try {
                System.out.println("Waiting for a client to connect...");
                // Spawn new thread for communication with client 
                new CommunicationThread(server_skt.accept(), server.pq).start();
            } catch(IOException e) {
                System.out.println("Exception occured :" + e.getStackTrace());
            }
        }
    }
}

这就是 CommunicationThread class 的样子

public class CommunicationThread extends Thread {
    private Socket client_socket;
    private InputStream client_in;
    private OutputStream client_out;
    private PriorityBlockingQueue<Integer> pq;

    public CommunicationThread(Socket socket, PriorityBlockingQueue<Integer> pq) {
        try {
            this.client_socket = socket;
            this.client_in = client_socket.getInputStream();
            this.client_out = client_socket.getOutputStream(); 
            this.pq = pq;

            System.out.println("Client connected : " + client_socket.getInetAddress().toString());
        } catch(IOException e) {
            System.out.println("Could not initialize communication properly. -- CommunicationThread.\n");
        }
    }

    @Override
    public void run() {
        boolean active = true;
        while(active) {
            int message_number = client_in.read(); // Listen for next integer --> dispatch to correct method

            switch(message_number) {
            case -1: case 0: 
                // Will stop the communication between client and server
                active = false;
                break;
            case 1:
                // Add
                int element_to_add = client_in.read(); // read element to add to the priority queue
                pq.add(element_to_add); // Note that a real implementation would send the answer back to the client
                break;
            case 2:
                // Poll (no extra argument to read)
                int res = pq.poll();

                // Write result to client
                client_out.write(res);
                client_out.flush();
                break;

            /*
             * IMPLEMENT REST OF INTERFACE (don't worry about synchronization, PriorityBlockingQueue methods are already thread safe)
             */

            }
        }

        client_in.close();
        client_out.close();
    }
}

这个class正在监听客户端发送的内容。 根据客户端发送的内容,服务器知道要做什么,因此有一个迷你协议。该协议是:当客户端想要调用分布式优先级队列的方法时,他发送一个整数(例如 2 = poll())。服务器读取该整数并知道调用哪个方法。

请注意,有时发送一个整数就足够了(参见 poll() 示例),但并非总是如此。想想 add() 的例子,它必须指定一个参数。服务器将从客户端接收 1(即 add())并将读取第二个整数(或必须存储在分布式优先级队列中的任何其他对象)。

客户

基于协议,服务器正在为客户端提供一个接口(例如 0 = 停止通信,1 = add(),...)。客户端只需连接到服务器并向其发送消息(遵守协议!)。

客户端示例:

public class PQ_Client {
    private static Socket skt;
    private InputStream in;
    private OutputStream out;

    private final int _STOP_ = 0, _ADD_ = 1, _POLL_ = 2; // By convention (protocol)

    PQ_Client(String ip, int port) {
        try {
            this.skt = new Socket(ip, port);
            this.in = skt.getInputStream();
            this.out = skt.getOutputStream();

            System.out.println("Connected to distributed priority queue.");
        } catch(IOException e) {
            System.out.println("Could not connect with the distributed priority queue : " + e.getStackTrace());
        }
    }

    // Sort of stub functions
    public void stop() {
        out.write(_STOP_);
        out.flush();
        out.close();
    }

    public void add(Integer el) {
        out.write(_ADD_); // Send wanted operation
        out.write(el);    // Send argument element

        // Real implementation would listen for result here

        out.flush();
    }

    public int poll() {
        out.write(_POLL_);
        out.flush();

        // Listen for answer
        return in.read();
    }

    /*
     * Rest of implementation
     */
}

请注意,多亏了这些自制的 "stub functions" 我们可以创建一个 PQ_Client 对象并将其用作优先级队列(client/server 通信隐藏在存根后面。

String ip = "...";
int port = 5555;

PQ_Client pq = new PQ_Client(ip , port);    
pq.add(5);
pq.add(2);
pq.add(4);

int res = pq.poll();

请注意,通过使用 RPC(远程过程调用)可以更容易(自动生成存根函数,...)。 事实上,我们上面实现的是一个类似于 RPC 的机制,因为它什么都不做,然后发送消息以调用服务器上的过程(例如 add()),序列化结果(整数不需要),发送它返回给客户端。