如何使用 Disruptor 创建一个基于 Java NIO(非阻塞 IO)的 TCP 服务器?

How to make a Java NIO (Non blocking IO) based TCP server using Disruptor?

我正在尝试使用 Disruptor 实现一个 JAVA 基于 NIO 的 TCP 服务器。

Java NIO 以非阻塞方式工作。所有新连接首先命中 ServerAccept 套接字。然后使用键(从 selector.select() 返回)方法,适当的处理程序(如果键是可接受的,一个新的套接字通道被创建,并且通道被注册到选择器,如果键是可读的,从通道读取内容,然后注册写入,如果密钥可写,则写入通道,无论响应应该有什么)被调用。最简单的基于 NIO 的服务器在单个线程中工作(所有处理程序和选择器都在同一个线程中)。

Java Disruptor是一个高性能的Ring实现,可以用来在不同组件(线程)之间传递消息。

我的问题如下

  1. NIO设计可以使用多线程吗?

  2. 我们可以 运行 单独线程中的事件处理程序吗?

  3. 如果我们可以运行单独线程中的eventHandlers,我们如何在线程之间传递selectionKeys和channels?

  4. 能否使用java Disruptor库在主线程(其中选择器运行s)和eventHandler线程之间传输数据?

  5. 如果可以的话,设计思路是什么? (Disruptor中EventProducer、EventConsumer和RingBuffer的行为是什么?)

您可以使用任何线程消息传递方法制作基于 NIO 的服务器,其中 disruptor 是一种选择。

那里,你需要解决的问题是如何将工作分担到不同的线程(而不是在主线程本身处理请求)。

因此,您可以使用中断程序作为消息传递方法,将从套接字连接获得的缓冲区传递给单独的线程。此外,您需要维护一个共享的并发哈希图,以通知主线程(运行事件循环)响应是否准备就绪。下面是一个例子。

HttpEvent.java

import java.nio.ByteBuffer;

public class HttpEvent
{
    private ByteBuffer buffer;
    private String requestId;
    private int numRead;


    public ByteBuffer getBuffer() {
        return buffer;
    }

    public void setBuffer(ByteBuffer buffer) {
        this.buffer = buffer;
    }

    public String getRequestId() {
        return requestId;
    }

    public void setRequestId(String requestId) {
        this.requestId = requestId;
    }

    public int getNumRead() {
        return numRead;
    }

    public void setNumRead(int numRead) {
        this.numRead = numRead;
    }
}

HttpEventFactory.java

import com.lmax.disruptor.EventFactory;

public class HttpEventFactory implements EventFactory<HttpEvent>
{
    public HttpEvent newInstance()
    {
        return new HttpEvent();
    }
}

HttpEventHandler.java

import com.lmax.disruptor.EventHandler;

import java.nio.ByteBuffer;
import java.util.Dictionary;
import java.util.concurrent.ConcurrentHashMap;

public class HttpEventHandler implements EventHandler<HttpEvent>
{
    private int id;
    private ConcurrentHashMap concurrentHashMap;

    public HttpEventHandler(int id, ConcurrentHashMap concurrentHashMap){
        this.id = id;
        this.concurrentHashMap = concurrentHashMap;

    }

    public void onEvent(HttpEvent event, long sequence, boolean endOfBatch) throws Exception
    {
        if( sequence % Runtime.getRuntime().availableProcessors()==id){


            String requestId = event.getRequestId();
            ByteBuffer buffer = event.getBuffer();
            int numRead= event.getNumRead();

            ByteBuffer responseBuffer = handleRequest(buffer, numRead);


            this.concurrentHashMap.put(requestId, responseBuffer);

        }
    }

    private ByteBuffer handleRequest(ByteBuffer buffer, int numRead) throws Exception {

        buffer.flip();
        byte[] data = new byte[numRead];
        System.arraycopy(buffer.array(), 0, data, 0, numRead);
        String request = new String(data, "US-ASCII");
        request = request.split("\n")[0].trim();


        String response = serverRequest(request);

        buffer.clear();

        buffer.put(response.getBytes());
        return  buffer;
    }

    private String serverRequest(String request) throws Exception {
        String response = "Sample Response";
        if (request.startsWith("GET")) {

            // http request parsing and response generation should be done here.    


        return  response;
    }
}

HttpEventMain.java

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import org.apache.commons.lang3.RandomStringUtils;

import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class HttpEventMain
{
    private InetAddress addr;
    private int port;
    private Selector selector;
    private HttpEventProducer producer ;
    private ConcurrentHashMap concurrentHashMapResponse;
    private ConcurrentHashMap concurrentHashMapKey;

    public HttpEventMain(InetAddress addr, int port) throws IOException {
        this.setAddr(addr);
        this.setPort(port);
        this.setConcurrentHashMapResponse(new ConcurrentHashMap<>());
        this.concurrentHashMapKey = new ConcurrentHashMap<>();
    }


    public static void main(String[] args) throws Exception
    {
        System.out.println("----- Running the server on machine with "+Runtime.getRuntime().availableProcessors()+" cores -----");

        HttpEventMain server = new HttpEventMain(null, 4333);



        HttpEventFactory factory = new HttpEventFactory();


        int bufferSize = 1024;


        Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); // a thread pool to which we can assign tasks


        Disruptor<HttpEvent> disruptor = new Disruptor<HttpEvent>(factory, bufferSize, executor);

        HttpEventHandler [] handlers = new HttpEventHandler[Runtime.getRuntime().availableProcessors()];

        for(int i = 0; i<Runtime.getRuntime().availableProcessors();i++){
            handlers[i] = new HttpEventHandler(i, server.getConcurrentHashMapResponse());
        }


        disruptor.handleEventsWith(handlers);




        disruptor.start();


        RingBuffer<HttpEvent> ringBuffer = disruptor.getRingBuffer();

        server.setProducer(new HttpEventProducer(ringBuffer, server.getConcurrentHashMapResponse()));

        try {
            System.out.println("\n====================Server Details====================");
            System.out.println("Server Machine: "+ InetAddress.getLocalHost().getCanonicalHostName());
            System.out.println("Port number: " + server.getPort());

        } catch (UnknownHostException e1) {
            e1.printStackTrace();
        }

        try {

            server.start();

        } catch (IOException e) {
            System.err.println("Error occured in HttpEventMain:" + e.getMessage());
            System.exit(0);
        }



    }
    private void start() throws IOException {
        this.selector = Selector.open();
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);


        InetSocketAddress listenAddr = new InetSocketAddress(this.addr, this.port);
        serverChannel.socket().bind(listenAddr);
        serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);

        System.out.println("Server ready. Ctrl-C to stop.");

        while (true) {

            this.selector.select();


            Iterator keys = this.selector.selectedKeys().iterator();
            while (keys.hasNext()) {
                SelectionKey key = (SelectionKey) keys.next();

                keys.remove();

                if (! key.isValid()) {
                    continue;
                }

                if (key.isAcceptable()) {
                    this.accept(key);
                }
                else if (key.isReadable()) {
                    this.read(key);
                }
                else if (key.isWritable()) {
                    this.write(key);
                }
            }
        }

    }

    private void accept(SelectionKey key) throws IOException {

        ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
        SocketChannel channel = serverChannel.accept();
        channel.configureBlocking(false);


        Socket socket = channel.socket();
        SocketAddress remoteAddr = socket.getRemoteSocketAddress();

        channel.register(this.selector, SelectionKey.OP_READ);
    }

    private void read(SelectionKey key) throws IOException {

        SocketChannel channel = (SocketChannel) key.channel();

        ByteBuffer buffer = ByteBuffer.allocate(8192);
        int numRead = -1;
        try {
            numRead = channel.read(buffer);
        }
        catch (IOException e) {
            e.printStackTrace();
        }

        if (numRead == -1) {

            Socket socket = channel.socket();
            SocketAddress remoteAddr = socket.getRemoteSocketAddress();
            channel.close();
            key.cancel();
            return;

        }
        String requestID = RandomStringUtils.random(15, true, true);

        while(concurrentHashMapKey.containsValue(requestID) || concurrentHashMapResponse.containsKey(requestID)){
            requestID = RandomStringUtils.random(15, true, true);
        }

        concurrentHashMapKey.put(key, requestID);

        this.producer.onData(requestID, buffer, numRead);

        channel.register(this.selector, SelectionKey.OP_WRITE, buffer);
    }

    private boolean responseReady(SelectionKey key){

        String requestId = concurrentHashMapKey.get(key).toString();
        String response = concurrentHashMapResponse.get(requestId).toString();

        if(response!="0"){
            concurrentHashMapKey.remove(key);
            concurrentHashMapResponse.remove(requestId);
            return true;
        }else{
            return false;
        }

    }

    private void write(SelectionKey key) throws IOException {

        if(responseReady(key)) {
            SocketChannel channel = (SocketChannel) key.channel();
            ByteBuffer inputBuffer = (ByteBuffer) key.attachment();
            inputBuffer.flip();
            channel.write(inputBuffer);
            channel.close();
            key.cancel();

        }

    }

    public HttpEventProducer getProducer() {
        return producer;
    }

    public void setProducer(HttpEventProducer producer) {
        this.producer = producer;
    }

    public ConcurrentHashMap getConcurrentHashMapResponse() {
        return concurrentHashMapResponse;
    }

    public void setConcurrentHashMapResponse(ConcurrentHashMap concurrentHashMapResponse) {
        this.concurrentHashMapResponse = concurrentHashMapResponse;
    }

    public InetAddress getAddr() {
        return addr;
    }

    public void setAddr(InetAddress addr) {
        this.addr = addr;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public Selector getSelector() {
        return selector;
    }

    public void setSelector(Selector selector) {
        this.selector = selector;
    }
}

HttpEventProducer.java

import com.lmax.disruptor.RingBuffer;

import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;

public class HttpEventProducer
{
    private final RingBuffer<HttpEvent> ringBuffer;
    private final ConcurrentHashMap concurrentHashMap;

    public HttpEventProducer(RingBuffer<HttpEvent> ringBuffer, ConcurrentHashMap concurrentHashMap)
    {
        this.ringBuffer = ringBuffer;
        this.concurrentHashMap = concurrentHashMap;
    }

    public void onData(String requestId, ByteBuffer buffer, int numRead)
    {
        long sequence = ringBuffer.next();

        try
        {
            HttpEvent event = ringBuffer.get(sequence);
            event.setBuffer(buffer);
            event.setRequestId(requestId);
            event.setNumRead(numRead);
        }
        finally
        {
            concurrentHashMap.put(requestId, "0");
            ringBuffer.publish(sequence);


        }
    }
}