在 JAVA 中提高 Netty 服务器的性能
Improving the Performance of Netty Server in JAVA
我遇到这样的情况:我的 Netty 服务器将以极快的速度从客户端获取数据。我认为客户端在某种程度上使用了 PUSH 机制来达到这种速度。我不知道 PUSH - POP 机制到底是什么,但我确实觉得客户端正在使用某种机制以非常高的速度发送数据 speed.Now 我的要求是,我写了一个简单的 TCP Netty 服务器来接收数据来自客户端,只是添加到使用 ArrayBlockingQueue 实现的 BlockingQueue。现在,由于 Netty 是基于事件的,接受数据并将其存储在队列中所花费的时间更多,这在客户端引发了一个异常,即 Netty 服务器不是 running.But 我的服务器是 运行 完美,但是接受单条数据入队列的时间比较多。我怎样才能防止这种情况发生?这种情况有最快的队列吗?我 nam 使用 BlockingQueue 作为另一个线程将从队列中获取数据并处理它。所以我需要一个同步队列。我怎样才能提高服务器的性能,或者有什么方法可以以非常高的速度插入数据?我只关心延迟。延迟需要尽可能低。
我的服务器代码:
public class Server implements Runnable {
private final int port;
static String message;
Channel channel;
ChannelFuture channelFuture;
int rcvBuf, sndBuf, lowWaterMark, highWaterMark;
public Server(int port) {
this.port = port;
rcvBuf = 2048;
sndBuf = 2048;
lowWaterMark = 1024;
highWaterMark = 2048;
}
@Override
public void run() {
try {
startServer();
} catch (Exception ex) {
System.err.println("Error in Server : "+ex);
Logger.error(ex.getMessage());
}
}
public void startServer() {
// System.out.println("8888 Server started");
EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childOption(ChannelOption.SO_RCVBUF, rcvBuf * 2048)
.childOption(ChannelOption.SO_SNDBUF, sndBuf * 2048)
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(lowWaterMark * 2048, highWaterMark * 2048))
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
channel = ch;
System.err.println("OMS connected : " + ch.localAddress());
ch.pipeline().addLast(new ReceiveFromOMSDecoder());
}
});
channelFuture = b.bind(port).sync();
this.channel = channelFuture.channel();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException ex) {
System.err.println("Exception raised in SendToOMS class"+ex);
} finally {
group.shutdownGracefully();
}
}
}
我的 ServerHandler 代码:
@Sharable
public class ReceiveFromOMSDecoder extends MessageToMessageDecoder<ByteBuf> {
private Charset charset;
public ReceiveFromOMSDecoder() {
this(Charset.defaultCharset());
}
/**
* Creates a new instance with the specified character set.
*/
public ReceiveFromOMSDecoder(Charset charset) {
if (charset == null) {
throw new NullPointerException("charset");
}
this.charset = charset;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
String buffer = msg.toString(charset);
if(buffer!=null){
Server.sq.insertStringIntoSendingQueue(buffer); //inserting into queue
}
else{
Logger.error("Null string received"+buffer);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Logger.error(cause.getMessage());
System.err.println(cause);
}
}
三个速成:
- 您似乎没有发送回复。你可能应该。
- 不要阻塞 IO 线程。使用 EventExecutorGroup to dispatch the handling of the incoming payload. i.e. something like ChannelPipeline.addLast(EventExecutorGroup group, String name, ChannelHandler handler).
- 一般情况下不要阻塞。放弃你的 ArrayBlockingQueue 并查看 JCTools 或其他一些实现以找到非阻塞模拟。
我遇到这样的情况:我的 Netty 服务器将以极快的速度从客户端获取数据。我认为客户端在某种程度上使用了 PUSH 机制来达到这种速度。我不知道 PUSH - POP 机制到底是什么,但我确实觉得客户端正在使用某种机制以非常高的速度发送数据 speed.Now 我的要求是,我写了一个简单的 TCP Netty 服务器来接收数据来自客户端,只是添加到使用 ArrayBlockingQueue 实现的 BlockingQueue。现在,由于 Netty 是基于事件的,接受数据并将其存储在队列中所花费的时间更多,这在客户端引发了一个异常,即 Netty 服务器不是 running.But 我的服务器是 运行 完美,但是接受单条数据入队列的时间比较多。我怎样才能防止这种情况发生?这种情况有最快的队列吗?我 nam 使用 BlockingQueue 作为另一个线程将从队列中获取数据并处理它。所以我需要一个同步队列。我怎样才能提高服务器的性能,或者有什么方法可以以非常高的速度插入数据?我只关心延迟。延迟需要尽可能低。
我的服务器代码:
public class Server implements Runnable {
private final int port;
static String message;
Channel channel;
ChannelFuture channelFuture;
int rcvBuf, sndBuf, lowWaterMark, highWaterMark;
public Server(int port) {
this.port = port;
rcvBuf = 2048;
sndBuf = 2048;
lowWaterMark = 1024;
highWaterMark = 2048;
}
@Override
public void run() {
try {
startServer();
} catch (Exception ex) {
System.err.println("Error in Server : "+ex);
Logger.error(ex.getMessage());
}
}
public void startServer() {
// System.out.println("8888 Server started");
EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childOption(ChannelOption.SO_RCVBUF, rcvBuf * 2048)
.childOption(ChannelOption.SO_SNDBUF, sndBuf * 2048)
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(lowWaterMark * 2048, highWaterMark * 2048))
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
channel = ch;
System.err.println("OMS connected : " + ch.localAddress());
ch.pipeline().addLast(new ReceiveFromOMSDecoder());
}
});
channelFuture = b.bind(port).sync();
this.channel = channelFuture.channel();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException ex) {
System.err.println("Exception raised in SendToOMS class"+ex);
} finally {
group.shutdownGracefully();
}
}
}
我的 ServerHandler 代码:
@Sharable
public class ReceiveFromOMSDecoder extends MessageToMessageDecoder<ByteBuf> {
private Charset charset;
public ReceiveFromOMSDecoder() {
this(Charset.defaultCharset());
}
/**
* Creates a new instance with the specified character set.
*/
public ReceiveFromOMSDecoder(Charset charset) {
if (charset == null) {
throw new NullPointerException("charset");
}
this.charset = charset;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
String buffer = msg.toString(charset);
if(buffer!=null){
Server.sq.insertStringIntoSendingQueue(buffer); //inserting into queue
}
else{
Logger.error("Null string received"+buffer);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Logger.error(cause.getMessage());
System.err.println(cause);
}
}
三个速成:
- 您似乎没有发送回复。你可能应该。
- 不要阻塞 IO 线程。使用 EventExecutorGroup to dispatch the handling of the incoming payload. i.e. something like ChannelPipeline.addLast(EventExecutorGroup group, String name, ChannelHandler handler).
- 一般情况下不要阻塞。放弃你的 ArrayBlockingQueue 并查看 JCTools 或其他一些实现以找到非阻塞模拟。