如何在JAVA非阻塞I/O(NIO)API中使用多核?
How to use multiple cores in the JAVA Non Blocking I/O (NIO) API?
JAVA NIO提供了一个API使用NIO架构编写TCP服务器,如下
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.text.ParseException;
import java.util.*;
public class NIOServer implements Runnable{
private InetAddress addr;
private int port;
private Selector selector;
public NIOServer(InetAddress addr, int port) throws IOException {
this.addr = addr;
this.port = port;
}
public void run(){
try {
startServer();
}catch(IOException ex){
System.out.println(ex.getMessage());
}
}
private void startServer() throws IOException {
this.selector = Selector.open();
ServerSocketChannel serverChannel = serverSocketChannel.open();
serverChannel.configureBlocking(false);
InetSocketAddress listenAddr = new InetSocketAddress(this.addr, this.port);
serverChannel.socket().bind(listenAddr);
serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
while (true) {
this.selector.select();
Iterator keys = this.selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = (SelectionKey) keys.next();
keys.remove();
if (! key.isValid()) {
continue;
}
if (key.isAcceptable()) {
this.accept(key);
}
else if (key.isReadable()) {
this.read(key);
}
else if (key.isWritable()) {
this.write(key);
}
}
}
}
}
这使用单个线程来处理读取、写入和接受等事件。
与每个连接的阻塞线程架构相比,这是更受欢迎的,因为它的非阻塞性质导致最小的缓存未命中、线程开销、低 cpu 迁移。
但是,此架构仅使用单线程。在多进程环境下(比如4核cpu),NIO架构浪费了其他核。有没有我可以使用的设计方法来利用 NIO 架构的所有内核?
NIO2(基于前摄器模式)就是这样一种选择。但是底层架构和原来的NIO有很大的不同。
基本思路是拆分任务:
ExecuterService workers = Executors.newFixedThreadPool(50);
....
while (true) {
this.selector.select();
Iterator keys = this.selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = (SelectionKey) keys.next();
keys.remove();
if (! key.isValid()) {
continue;
}
if (key.isAcceptable()) {
this.accept(key);
}
else if (key.isReadable()) {
workers.execute(new ReadTaskHandler(key));
}
else if (key.isWritable()) {
workers.execute(new WriteTaskHandler(key));
}
}
}
class ReadTaskHandler implements Runnable {
SelectionKey key;
public ReadTaskHandler(SelectionKey key) {
this.key = key;
}
@Override
public void run() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel channel = (SocketChannel) key.channel();
int size = 0;
try {
while ((size = channel.read(buffer)) > 0) {
System.out.println(new String(buffer.array()));
buffer.flip();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
其实关于NIO的车型有很多。比如我们也可以使用多线程来处理accept任务(也叫多反应器模型或者多事件循环模型)。
BTW,Netty是一个很棒的event-driven网络应用框架封装javaNIO
JAVA NIO提供了一个API使用NIO架构编写TCP服务器,如下
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.text.ParseException;
import java.util.*;
public class NIOServer implements Runnable{
private InetAddress addr;
private int port;
private Selector selector;
public NIOServer(InetAddress addr, int port) throws IOException {
this.addr = addr;
this.port = port;
}
public void run(){
try {
startServer();
}catch(IOException ex){
System.out.println(ex.getMessage());
}
}
private void startServer() throws IOException {
this.selector = Selector.open();
ServerSocketChannel serverChannel = serverSocketChannel.open();
serverChannel.configureBlocking(false);
InetSocketAddress listenAddr = new InetSocketAddress(this.addr, this.port);
serverChannel.socket().bind(listenAddr);
serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
while (true) {
this.selector.select();
Iterator keys = this.selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = (SelectionKey) keys.next();
keys.remove();
if (! key.isValid()) {
continue;
}
if (key.isAcceptable()) {
this.accept(key);
}
else if (key.isReadable()) {
this.read(key);
}
else if (key.isWritable()) {
this.write(key);
}
}
}
}
}
这使用单个线程来处理读取、写入和接受等事件。
与每个连接的阻塞线程架构相比,这是更受欢迎的,因为它的非阻塞性质导致最小的缓存未命中、线程开销、低 cpu 迁移。
但是,此架构仅使用单线程。在多进程环境下(比如4核cpu),NIO架构浪费了其他核。有没有我可以使用的设计方法来利用 NIO 架构的所有内核?
NIO2(基于前摄器模式)就是这样一种选择。但是底层架构和原来的NIO有很大的不同。
基本思路是拆分任务:
ExecuterService workers = Executors.newFixedThreadPool(50);
....
while (true) {
this.selector.select();
Iterator keys = this.selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = (SelectionKey) keys.next();
keys.remove();
if (! key.isValid()) {
continue;
}
if (key.isAcceptable()) {
this.accept(key);
}
else if (key.isReadable()) {
workers.execute(new ReadTaskHandler(key));
}
else if (key.isWritable()) {
workers.execute(new WriteTaskHandler(key));
}
}
}
class ReadTaskHandler implements Runnable {
SelectionKey key;
public ReadTaskHandler(SelectionKey key) {
this.key = key;
}
@Override
public void run() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel channel = (SocketChannel) key.channel();
int size = 0;
try {
while ((size = channel.read(buffer)) > 0) {
System.out.println(new String(buffer.array()));
buffer.flip();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
其实关于NIO的车型有很多。比如我们也可以使用多线程来处理accept任务(也叫多反应器模型或者多事件循环模型)。
BTW,Netty是一个很棒的event-driven网络应用框架封装javaNIO