如何使用 Disruptor 创建一个基于 Java NIO(非阻塞 IO)的 TCP 服务器?
How to make a Java NIO (Non blocking IO) based TCP server using Disruptor?
我正在尝试使用 Disruptor 实现一个 JAVA 基于 NIO 的 TCP 服务器。
Java NIO 以非阻塞方式工作。所有新连接首先命中 ServerAccept 套接字。然后使用键(从 selector.select() 返回)方法,适当的处理程序(如果键是可接受的,一个新的套接字通道被创建,并且通道被注册到选择器,如果键是可读的,从通道读取内容,然后注册写入,如果密钥可写,则写入通道,无论响应应该有什么)被调用。最简单的基于 NIO 的服务器在单个线程中工作(所有处理程序和选择器都在同一个线程中)。
Java Disruptor是一个高性能的Ring实现,可以用来在不同组件(线程)之间传递消息。
我的问题如下
NIO设计可以使用多线程吗?
我们可以 运行 单独线程中的事件处理程序吗?
如果我们可以运行单独线程中的eventHandlers,我们如何在线程之间传递selectionKeys和channels?
能否使用java Disruptor库在主线程(其中选择器运行s)和eventHandler线程之间传输数据?
如果可以的话,设计思路是什么? (Disruptor中EventProducer、EventConsumer和RingBuffer的行为是什么?)
您可以使用任何线程消息传递方法制作基于 NIO 的服务器,其中 disruptor 是一种选择。
那里,你需要解决的问题是如何将工作分担到不同的线程(而不是在主线程本身处理请求)。
因此,您可以使用中断程序作为消息传递方法,将从套接字连接获得的缓冲区传递给单独的线程。此外,您需要维护一个共享的并发哈希图,以通知主线程(运行事件循环)响应是否准备就绪。下面是一个例子。
HttpEvent.java
import java.nio.ByteBuffer;
public class HttpEvent
{
private ByteBuffer buffer;
private String requestId;
private int numRead;
public ByteBuffer getBuffer() {
return buffer;
}
public void setBuffer(ByteBuffer buffer) {
this.buffer = buffer;
}
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public int getNumRead() {
return numRead;
}
public void setNumRead(int numRead) {
this.numRead = numRead;
}
}
HttpEventFactory.java
import com.lmax.disruptor.EventFactory;
public class HttpEventFactory implements EventFactory<HttpEvent>
{
public HttpEvent newInstance()
{
return new HttpEvent();
}
}
HttpEventHandler.java
import com.lmax.disruptor.EventHandler;
import java.nio.ByteBuffer;
import java.util.Dictionary;
import java.util.concurrent.ConcurrentHashMap;
public class HttpEventHandler implements EventHandler<HttpEvent>
{
private int id;
private ConcurrentHashMap concurrentHashMap;
public HttpEventHandler(int id, ConcurrentHashMap concurrentHashMap){
this.id = id;
this.concurrentHashMap = concurrentHashMap;
}
public void onEvent(HttpEvent event, long sequence, boolean endOfBatch) throws Exception
{
if( sequence % Runtime.getRuntime().availableProcessors()==id){
String requestId = event.getRequestId();
ByteBuffer buffer = event.getBuffer();
int numRead= event.getNumRead();
ByteBuffer responseBuffer = handleRequest(buffer, numRead);
this.concurrentHashMap.put(requestId, responseBuffer);
}
}
private ByteBuffer handleRequest(ByteBuffer buffer, int numRead) throws Exception {
buffer.flip();
byte[] data = new byte[numRead];
System.arraycopy(buffer.array(), 0, data, 0, numRead);
String request = new String(data, "US-ASCII");
request = request.split("\n")[0].trim();
String response = serverRequest(request);
buffer.clear();
buffer.put(response.getBytes());
return buffer;
}
private String serverRequest(String request) throws Exception {
String response = "Sample Response";
if (request.startsWith("GET")) {
// http request parsing and response generation should be done here.
return response;
}
}
HttpEventMain.java
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import org.apache.commons.lang3.RandomStringUtils;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class HttpEventMain
{
private InetAddress addr;
private int port;
private Selector selector;
private HttpEventProducer producer ;
private ConcurrentHashMap concurrentHashMapResponse;
private ConcurrentHashMap concurrentHashMapKey;
public HttpEventMain(InetAddress addr, int port) throws IOException {
this.setAddr(addr);
this.setPort(port);
this.setConcurrentHashMapResponse(new ConcurrentHashMap<>());
this.concurrentHashMapKey = new ConcurrentHashMap<>();
}
public static void main(String[] args) throws Exception
{
System.out.println("----- Running the server on machine with "+Runtime.getRuntime().availableProcessors()+" cores -----");
HttpEventMain server = new HttpEventMain(null, 4333);
HttpEventFactory factory = new HttpEventFactory();
int bufferSize = 1024;
Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); // a thread pool to which we can assign tasks
Disruptor<HttpEvent> disruptor = new Disruptor<HttpEvent>(factory, bufferSize, executor);
HttpEventHandler [] handlers = new HttpEventHandler[Runtime.getRuntime().availableProcessors()];
for(int i = 0; i<Runtime.getRuntime().availableProcessors();i++){
handlers[i] = new HttpEventHandler(i, server.getConcurrentHashMapResponse());
}
disruptor.handleEventsWith(handlers);
disruptor.start();
RingBuffer<HttpEvent> ringBuffer = disruptor.getRingBuffer();
server.setProducer(new HttpEventProducer(ringBuffer, server.getConcurrentHashMapResponse()));
try {
System.out.println("\n====================Server Details====================");
System.out.println("Server Machine: "+ InetAddress.getLocalHost().getCanonicalHostName());
System.out.println("Port number: " + server.getPort());
} catch (UnknownHostException e1) {
e1.printStackTrace();
}
try {
server.start();
} catch (IOException e) {
System.err.println("Error occured in HttpEventMain:" + e.getMessage());
System.exit(0);
}
}
private void start() throws IOException {
this.selector = Selector.open();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
InetSocketAddress listenAddr = new InetSocketAddress(this.addr, this.port);
serverChannel.socket().bind(listenAddr);
serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
System.out.println("Server ready. Ctrl-C to stop.");
while (true) {
this.selector.select();
Iterator keys = this.selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = (SelectionKey) keys.next();
keys.remove();
if (! key.isValid()) {
continue;
}
if (key.isAcceptable()) {
this.accept(key);
}
else if (key.isReadable()) {
this.read(key);
}
else if (key.isWritable()) {
this.write(key);
}
}
}
}
private void accept(SelectionKey key) throws IOException {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel channel = serverChannel.accept();
channel.configureBlocking(false);
Socket socket = channel.socket();
SocketAddress remoteAddr = socket.getRemoteSocketAddress();
channel.register(this.selector, SelectionKey.OP_READ);
}
private void read(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(8192);
int numRead = -1;
try {
numRead = channel.read(buffer);
}
catch (IOException e) {
e.printStackTrace();
}
if (numRead == -1) {
Socket socket = channel.socket();
SocketAddress remoteAddr = socket.getRemoteSocketAddress();
channel.close();
key.cancel();
return;
}
String requestID = RandomStringUtils.random(15, true, true);
while(concurrentHashMapKey.containsValue(requestID) || concurrentHashMapResponse.containsKey(requestID)){
requestID = RandomStringUtils.random(15, true, true);
}
concurrentHashMapKey.put(key, requestID);
this.producer.onData(requestID, buffer, numRead);
channel.register(this.selector, SelectionKey.OP_WRITE, buffer);
}
private boolean responseReady(SelectionKey key){
String requestId = concurrentHashMapKey.get(key).toString();
String response = concurrentHashMapResponse.get(requestId).toString();
if(response!="0"){
concurrentHashMapKey.remove(key);
concurrentHashMapResponse.remove(requestId);
return true;
}else{
return false;
}
}
private void write(SelectionKey key) throws IOException {
if(responseReady(key)) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer inputBuffer = (ByteBuffer) key.attachment();
inputBuffer.flip();
channel.write(inputBuffer);
channel.close();
key.cancel();
}
}
public HttpEventProducer getProducer() {
return producer;
}
public void setProducer(HttpEventProducer producer) {
this.producer = producer;
}
public ConcurrentHashMap getConcurrentHashMapResponse() {
return concurrentHashMapResponse;
}
public void setConcurrentHashMapResponse(ConcurrentHashMap concurrentHashMapResponse) {
this.concurrentHashMapResponse = concurrentHashMapResponse;
}
public InetAddress getAddr() {
return addr;
}
public void setAddr(InetAddress addr) {
this.addr = addr;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public Selector getSelector() {
return selector;
}
public void setSelector(Selector selector) {
this.selector = selector;
}
}
HttpEventProducer.java
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
public class HttpEventProducer
{
private final RingBuffer<HttpEvent> ringBuffer;
private final ConcurrentHashMap concurrentHashMap;
public HttpEventProducer(RingBuffer<HttpEvent> ringBuffer, ConcurrentHashMap concurrentHashMap)
{
this.ringBuffer = ringBuffer;
this.concurrentHashMap = concurrentHashMap;
}
public void onData(String requestId, ByteBuffer buffer, int numRead)
{
long sequence = ringBuffer.next();
try
{
HttpEvent event = ringBuffer.get(sequence);
event.setBuffer(buffer);
event.setRequestId(requestId);
event.setNumRead(numRead);
}
finally
{
concurrentHashMap.put(requestId, "0");
ringBuffer.publish(sequence);
}
}
}
我正在尝试使用 Disruptor 实现一个 JAVA 基于 NIO 的 TCP 服务器。
Java NIO 以非阻塞方式工作。所有新连接首先命中 ServerAccept 套接字。然后使用键(从 selector.select() 返回)方法,适当的处理程序(如果键是可接受的,一个新的套接字通道被创建,并且通道被注册到选择器,如果键是可读的,从通道读取内容,然后注册写入,如果密钥可写,则写入通道,无论响应应该有什么)被调用。最简单的基于 NIO 的服务器在单个线程中工作(所有处理程序和选择器都在同一个线程中)。
Java Disruptor是一个高性能的Ring实现,可以用来在不同组件(线程)之间传递消息。
我的问题如下
NIO设计可以使用多线程吗?
我们可以 运行 单独线程中的事件处理程序吗?
如果我们可以运行单独线程中的eventHandlers,我们如何在线程之间传递selectionKeys和channels?
能否使用java Disruptor库在主线程(其中选择器运行s)和eventHandler线程之间传输数据?
如果可以的话,设计思路是什么? (Disruptor中EventProducer、EventConsumer和RingBuffer的行为是什么?)
您可以使用任何线程消息传递方法制作基于 NIO 的服务器,其中 disruptor 是一种选择。
那里,你需要解决的问题是如何将工作分担到不同的线程(而不是在主线程本身处理请求)。
因此,您可以使用中断程序作为消息传递方法,将从套接字连接获得的缓冲区传递给单独的线程。此外,您需要维护一个共享的并发哈希图,以通知主线程(运行事件循环)响应是否准备就绪。下面是一个例子。
HttpEvent.java
import java.nio.ByteBuffer;
public class HttpEvent
{
private ByteBuffer buffer;
private String requestId;
private int numRead;
public ByteBuffer getBuffer() {
return buffer;
}
public void setBuffer(ByteBuffer buffer) {
this.buffer = buffer;
}
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public int getNumRead() {
return numRead;
}
public void setNumRead(int numRead) {
this.numRead = numRead;
}
}
HttpEventFactory.java
import com.lmax.disruptor.EventFactory;
public class HttpEventFactory implements EventFactory<HttpEvent>
{
public HttpEvent newInstance()
{
return new HttpEvent();
}
}
HttpEventHandler.java
import com.lmax.disruptor.EventHandler;
import java.nio.ByteBuffer;
import java.util.Dictionary;
import java.util.concurrent.ConcurrentHashMap;
public class HttpEventHandler implements EventHandler<HttpEvent>
{
private int id;
private ConcurrentHashMap concurrentHashMap;
public HttpEventHandler(int id, ConcurrentHashMap concurrentHashMap){
this.id = id;
this.concurrentHashMap = concurrentHashMap;
}
public void onEvent(HttpEvent event, long sequence, boolean endOfBatch) throws Exception
{
if( sequence % Runtime.getRuntime().availableProcessors()==id){
String requestId = event.getRequestId();
ByteBuffer buffer = event.getBuffer();
int numRead= event.getNumRead();
ByteBuffer responseBuffer = handleRequest(buffer, numRead);
this.concurrentHashMap.put(requestId, responseBuffer);
}
}
private ByteBuffer handleRequest(ByteBuffer buffer, int numRead) throws Exception {
buffer.flip();
byte[] data = new byte[numRead];
System.arraycopy(buffer.array(), 0, data, 0, numRead);
String request = new String(data, "US-ASCII");
request = request.split("\n")[0].trim();
String response = serverRequest(request);
buffer.clear();
buffer.put(response.getBytes());
return buffer;
}
private String serverRequest(String request) throws Exception {
String response = "Sample Response";
if (request.startsWith("GET")) {
// http request parsing and response generation should be done here.
return response;
}
}
HttpEventMain.java
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import org.apache.commons.lang3.RandomStringUtils;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class HttpEventMain
{
private InetAddress addr;
private int port;
private Selector selector;
private HttpEventProducer producer ;
private ConcurrentHashMap concurrentHashMapResponse;
private ConcurrentHashMap concurrentHashMapKey;
public HttpEventMain(InetAddress addr, int port) throws IOException {
this.setAddr(addr);
this.setPort(port);
this.setConcurrentHashMapResponse(new ConcurrentHashMap<>());
this.concurrentHashMapKey = new ConcurrentHashMap<>();
}
public static void main(String[] args) throws Exception
{
System.out.println("----- Running the server on machine with "+Runtime.getRuntime().availableProcessors()+" cores -----");
HttpEventMain server = new HttpEventMain(null, 4333);
HttpEventFactory factory = new HttpEventFactory();
int bufferSize = 1024;
Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); // a thread pool to which we can assign tasks
Disruptor<HttpEvent> disruptor = new Disruptor<HttpEvent>(factory, bufferSize, executor);
HttpEventHandler [] handlers = new HttpEventHandler[Runtime.getRuntime().availableProcessors()];
for(int i = 0; i<Runtime.getRuntime().availableProcessors();i++){
handlers[i] = new HttpEventHandler(i, server.getConcurrentHashMapResponse());
}
disruptor.handleEventsWith(handlers);
disruptor.start();
RingBuffer<HttpEvent> ringBuffer = disruptor.getRingBuffer();
server.setProducer(new HttpEventProducer(ringBuffer, server.getConcurrentHashMapResponse()));
try {
System.out.println("\n====================Server Details====================");
System.out.println("Server Machine: "+ InetAddress.getLocalHost().getCanonicalHostName());
System.out.println("Port number: " + server.getPort());
} catch (UnknownHostException e1) {
e1.printStackTrace();
}
try {
server.start();
} catch (IOException e) {
System.err.println("Error occured in HttpEventMain:" + e.getMessage());
System.exit(0);
}
}
private void start() throws IOException {
this.selector = Selector.open();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
InetSocketAddress listenAddr = new InetSocketAddress(this.addr, this.port);
serverChannel.socket().bind(listenAddr);
serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
System.out.println("Server ready. Ctrl-C to stop.");
while (true) {
this.selector.select();
Iterator keys = this.selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = (SelectionKey) keys.next();
keys.remove();
if (! key.isValid()) {
continue;
}
if (key.isAcceptable()) {
this.accept(key);
}
else if (key.isReadable()) {
this.read(key);
}
else if (key.isWritable()) {
this.write(key);
}
}
}
}
private void accept(SelectionKey key) throws IOException {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel channel = serverChannel.accept();
channel.configureBlocking(false);
Socket socket = channel.socket();
SocketAddress remoteAddr = socket.getRemoteSocketAddress();
channel.register(this.selector, SelectionKey.OP_READ);
}
private void read(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(8192);
int numRead = -1;
try {
numRead = channel.read(buffer);
}
catch (IOException e) {
e.printStackTrace();
}
if (numRead == -1) {
Socket socket = channel.socket();
SocketAddress remoteAddr = socket.getRemoteSocketAddress();
channel.close();
key.cancel();
return;
}
String requestID = RandomStringUtils.random(15, true, true);
while(concurrentHashMapKey.containsValue(requestID) || concurrentHashMapResponse.containsKey(requestID)){
requestID = RandomStringUtils.random(15, true, true);
}
concurrentHashMapKey.put(key, requestID);
this.producer.onData(requestID, buffer, numRead);
channel.register(this.selector, SelectionKey.OP_WRITE, buffer);
}
private boolean responseReady(SelectionKey key){
String requestId = concurrentHashMapKey.get(key).toString();
String response = concurrentHashMapResponse.get(requestId).toString();
if(response!="0"){
concurrentHashMapKey.remove(key);
concurrentHashMapResponse.remove(requestId);
return true;
}else{
return false;
}
}
private void write(SelectionKey key) throws IOException {
if(responseReady(key)) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer inputBuffer = (ByteBuffer) key.attachment();
inputBuffer.flip();
channel.write(inputBuffer);
channel.close();
key.cancel();
}
}
public HttpEventProducer getProducer() {
return producer;
}
public void setProducer(HttpEventProducer producer) {
this.producer = producer;
}
public ConcurrentHashMap getConcurrentHashMapResponse() {
return concurrentHashMapResponse;
}
public void setConcurrentHashMapResponse(ConcurrentHashMap concurrentHashMapResponse) {
this.concurrentHashMapResponse = concurrentHashMapResponse;
}
public InetAddress getAddr() {
return addr;
}
public void setAddr(InetAddress addr) {
this.addr = addr;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public Selector getSelector() {
return selector;
}
public void setSelector(Selector selector) {
this.selector = selector;
}
}
HttpEventProducer.java
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
public class HttpEventProducer
{
private final RingBuffer<HttpEvent> ringBuffer;
private final ConcurrentHashMap concurrentHashMap;
public HttpEventProducer(RingBuffer<HttpEvent> ringBuffer, ConcurrentHashMap concurrentHashMap)
{
this.ringBuffer = ringBuffer;
this.concurrentHashMap = concurrentHashMap;
}
public void onData(String requestId, ByteBuffer buffer, int numRead)
{
long sequence = ringBuffer.next();
try
{
HttpEvent event = ringBuffer.get(sequence);
event.setBuffer(buffer);
event.setRequestId(requestId);
event.setNumRead(numRead);
}
finally
{
concurrentHashMap.put(requestId, "0");
ringBuffer.publish(sequence);
}
}
}