为什么下面的 Java NIO API 与 netty 相比这么慢
Why is the following Java NIO API so slow when compared to netty
我有以下 JAVA 实现来使用 NIO API 创建一个简单的 Web 服务器。
package zion
import java.net._
import java.nio.ByteBuffer
import java.nio.channels._
object NHello {
import java.nio.CharBuffer
import java.nio.charset.Charset
def helloWorldBytes: ByteBuffer = Charset
.forName("ISO-8859-1")
.newEncoder
.encode(CharBuffer.wrap(httpResponse("NHello World\n")))
def httpResponse(content: String): String = {
val rn = "\r\n"
List(
"HTTP/1.1 200 OK",
"Content-Type: text/html",
"Connection: Keep-Alive",
s"Content-Length: ${content.length()}",
rn + content
).mkString(rn)
}
def main(args: Array[String]): Unit = {
val port = 8080
val address = new InetSocketAddress(port)
// Server Socket Channel
val serverSocketChannel = ServerSocketChannel.open()
serverSocketChannel.bind(address)
serverSocketChannel.configureBlocking(false)
// Selector
val selector = Selector.open()
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT)
while (true) {
selector.select()
val iterator = selector.selectedKeys().iterator()
while (iterator.hasNext) {
val key = iterator.next()
if (key.isAcceptable) {
val channel = serverSocketChannel.accept()
channel.write(helloWorldBytes)
channel.close()
}
}
iterator.remove()
}
sys.addShutdownHook({
println("Shutting down...")
serverSocketChannel.close()
})
println("Exiting...")
}
}
使用 wrk
我每秒处理几千个请求。
wrk -t12 -c100 -d10s http://127.0.0.1:8080
与 Netty 相比,这似乎有点太慢了。使用 Netty,我可以获得至少 10 到 15 倍的吞吐量。考虑到 Netty 也是建立在 NIO 之上的,我做错了什么?
是否有一些明显的性能优化是我遗漏的?
很多,基本上你是在以 'blocking' 的方式使用 NIO。检查这个:
https://bedrin.livejournal.com/204307.html
https://crunchify.com/java-nio-non-blocking-io-with-server-client-example-java-nio-bytebuffer-and-channels-selector-java-nio-vs-io/
经过进一步的搜索和分析,我终于找出了上面这段代码中的所有问题。
def main(args: Array[String]): Unit = {
val port = 8080
val address = new InetSocketAddress(port)
val serverSocketChannel = ServerSocketChannel.open()
serverSocketChannel.bind(address)
serverSocketChannel.configureBlocking(false)
val selector = Selector.open()
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT)
while (true) {
selector.select()
val iterator = selector.selectedKeys().iterator()
while (iterator.hasNext) {
val key = iterator.next()
if (key.isAcceptable) {
val channel = serverSocketChannel.accept()
// 1. Blocking Write
channel.write(helloWorldBytes)
// 2. Blocking Close
channel.close()
}
}
iterator.remove()
}
sys.addShutdownHook({
println("Shutting down...")
serverSocketChannel.close()
})
println("Exiting...")
}
}
主要问题是
1.阻止写入
由于阻塞写入调用,除非将字节写入流中,否则我无法接受更多连接。所以那些连接只是闲置,从而影响网络服务器的性能
2。阻止关闭
close
调用也是阻塞的,需要一些时间才能完成。同样,除非连接关闭,否则不会接受新请求,也不会响应已接受的连接。
关闭连接还有另一个问题:创建新连接的成本很高,而且 wrk
等工具不会在发出一个请求后自动终止连接。在服务器上关闭它是在每个请求之后也会成为性能杀手,从而影响您的基准测试。
这是一个替代 "Highly performant" 实现
package zion
import java.io.IOException
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.{
AsynchronousChannelGroup,
AsynchronousServerSocketChannel,
AsynchronousSocketChannel,
CompletionHandler
}
import java.util.concurrent.{Executors, TimeUnit}
/**
* This is potentially as fast as it can get using NIO APIs.
*/
object HelloAsyncNIO {
// Create a thread pool for the socket channel
// It would be better to have probably only one thread for events.
// That pool could be shared betwee the SocketServer and in future SocketClients.
private val group =
AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(24))
// Socket to accept connections
private val serverSocketChannel = AsynchronousServerSocketChannel.open(group)
// Port to be used to connect
private val PORT = 8081
// Flag to handle logging
private val ENABLE_LOGGING = false
/**
* Contains utilities to manage read/write on the socket channels
*/
object NIOBuffer {
def helloWorldBytes: ByteBuffer = Charset
.forName("ISO-8859-1")
.newEncoder
.encode(CharBuffer.wrap(httpResponse("NHello World\n")))
def httpResponse(content: String): String = {
val rn = "\r\n"
List(
"HTTP/1.1 200 OK",
"Content-Type: text/html",
"Connection: Keep-Alive",
s"Content-Length: ${content.length()}",
rn + content
).mkString(rn)
}
private val writeByteBuffer = ByteBuffer.wrap(helloWorldBytes)
private val readByteBuffer = ByteBuffer.allocateDirect(1024 * 2) // 2kb
def read(
socket: AsynchronousSocketChannel
)(h: CompletionHandler[Integer, AsynchronousSocketChannel]): Unit =
socket.read(readByteBuffer.duplicate(), socket, h)
def write(
socket: AsynchronousSocketChannel
)(h: CompletionHandler[Integer, AsynchronousSocketChannel]): Unit =
socket.write(writeByteBuffer.duplicate(), socket, h)
}
// Generic async completion handler
case class Handle[V, A](cb: (V, A) => Unit) extends CompletionHandler[V, A] {
override def completed(result: V, attachment: A): Unit =
cb(result, attachment)
override def failed(cause: Throwable, attachment: A): Unit = {
cause match {
case e: IOException => log(e.getMessage)
case _ => cause.printStackTrace()
}
}
}
// Logging utility
def log(input: Any*): Unit = {
if (ENABLE_LOGGING) println(input.map(_.toString).mkString(", "))
}
private val onAccept
: Handle[AsynchronousSocketChannel, AsynchronousServerSocketChannel] =
Handle[AsynchronousSocketChannel, AsynchronousServerSocketChannel](
(socket, server) => {
log("\nACCEPT")
// Accept new connections immediately
server.accept(serverSocketChannel, onAccept)
// Read from the current socket
NIOBuffer.read(socket)(onRead)
}
)
private val onRead: Handle[Integer, AsynchronousSocketChannel] =
Handle[Integer, AsynchronousSocketChannel]((bytes, socket) => {
log("READ", bytes)
// EOF, meaning connection can be closed
if (bytes == -1) socket.close()
// Some data was read and now we can respond back
else if (bytes > 0) NIOBuffer.write(socket)(onWrite)
})
private val onWrite: Handle[Integer, AsynchronousSocketChannel] =
Handle[Integer, AsynchronousSocketChannel]((bytes, socket) => {
log("WRITE", bytes)
// Read from the socket
NIOBuffer.read(socket)(onRead)
})
def main(args: Array[String]): Unit = {
// Setup socket channel
serverSocketChannel.bind(new InetSocketAddress(PORT))
serverSocketChannel.accept(serverSocketChannel, onAccept)
// Making the main thread wait
group.awaitTermination(Long.MaxValue, TimeUnit.SECONDS)
}
}
我有以下 JAVA 实现来使用 NIO API 创建一个简单的 Web 服务器。
package zion
import java.net._
import java.nio.ByteBuffer
import java.nio.channels._
object NHello {
import java.nio.CharBuffer
import java.nio.charset.Charset
def helloWorldBytes: ByteBuffer = Charset
.forName("ISO-8859-1")
.newEncoder
.encode(CharBuffer.wrap(httpResponse("NHello World\n")))
def httpResponse(content: String): String = {
val rn = "\r\n"
List(
"HTTP/1.1 200 OK",
"Content-Type: text/html",
"Connection: Keep-Alive",
s"Content-Length: ${content.length()}",
rn + content
).mkString(rn)
}
def main(args: Array[String]): Unit = {
val port = 8080
val address = new InetSocketAddress(port)
// Server Socket Channel
val serverSocketChannel = ServerSocketChannel.open()
serverSocketChannel.bind(address)
serverSocketChannel.configureBlocking(false)
// Selector
val selector = Selector.open()
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT)
while (true) {
selector.select()
val iterator = selector.selectedKeys().iterator()
while (iterator.hasNext) {
val key = iterator.next()
if (key.isAcceptable) {
val channel = serverSocketChannel.accept()
channel.write(helloWorldBytes)
channel.close()
}
}
iterator.remove()
}
sys.addShutdownHook({
println("Shutting down...")
serverSocketChannel.close()
})
println("Exiting...")
}
}
使用 wrk
我每秒处理几千个请求。
wrk -t12 -c100 -d10s http://127.0.0.1:8080
与 Netty 相比,这似乎有点太慢了。使用 Netty,我可以获得至少 10 到 15 倍的吞吐量。考虑到 Netty 也是建立在 NIO 之上的,我做错了什么?
是否有一些明显的性能优化是我遗漏的?
很多,基本上你是在以 'blocking' 的方式使用 NIO。检查这个:
https://bedrin.livejournal.com/204307.html https://crunchify.com/java-nio-non-blocking-io-with-server-client-example-java-nio-bytebuffer-and-channels-selector-java-nio-vs-io/
经过进一步的搜索和分析,我终于找出了上面这段代码中的所有问题。
def main(args: Array[String]): Unit = {
val port = 8080
val address = new InetSocketAddress(port)
val serverSocketChannel = ServerSocketChannel.open()
serverSocketChannel.bind(address)
serverSocketChannel.configureBlocking(false)
val selector = Selector.open()
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT)
while (true) {
selector.select()
val iterator = selector.selectedKeys().iterator()
while (iterator.hasNext) {
val key = iterator.next()
if (key.isAcceptable) {
val channel = serverSocketChannel.accept()
// 1. Blocking Write
channel.write(helloWorldBytes)
// 2. Blocking Close
channel.close()
}
}
iterator.remove()
}
sys.addShutdownHook({
println("Shutting down...")
serverSocketChannel.close()
})
println("Exiting...")
}
}
主要问题是
1.阻止写入 由于阻塞写入调用,除非将字节写入流中,否则我无法接受更多连接。所以那些连接只是闲置,从而影响网络服务器的性能
2。阻止关闭
close
调用也是阻塞的,需要一些时间才能完成。同样,除非连接关闭,否则不会接受新请求,也不会响应已接受的连接。
关闭连接还有另一个问题:创建新连接的成本很高,而且 wrk
等工具不会在发出一个请求后自动终止连接。在服务器上关闭它是在每个请求之后也会成为性能杀手,从而影响您的基准测试。
这是一个替代 "Highly performant" 实现
package zion
import java.io.IOException
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.{
AsynchronousChannelGroup,
AsynchronousServerSocketChannel,
AsynchronousSocketChannel,
CompletionHandler
}
import java.util.concurrent.{Executors, TimeUnit}
/**
* This is potentially as fast as it can get using NIO APIs.
*/
object HelloAsyncNIO {
// Create a thread pool for the socket channel
// It would be better to have probably only one thread for events.
// That pool could be shared betwee the SocketServer and in future SocketClients.
private val group =
AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(24))
// Socket to accept connections
private val serverSocketChannel = AsynchronousServerSocketChannel.open(group)
// Port to be used to connect
private val PORT = 8081
// Flag to handle logging
private val ENABLE_LOGGING = false
/**
* Contains utilities to manage read/write on the socket channels
*/
object NIOBuffer {
def helloWorldBytes: ByteBuffer = Charset
.forName("ISO-8859-1")
.newEncoder
.encode(CharBuffer.wrap(httpResponse("NHello World\n")))
def httpResponse(content: String): String = {
val rn = "\r\n"
List(
"HTTP/1.1 200 OK",
"Content-Type: text/html",
"Connection: Keep-Alive",
s"Content-Length: ${content.length()}",
rn + content
).mkString(rn)
}
private val writeByteBuffer = ByteBuffer.wrap(helloWorldBytes)
private val readByteBuffer = ByteBuffer.allocateDirect(1024 * 2) // 2kb
def read(
socket: AsynchronousSocketChannel
)(h: CompletionHandler[Integer, AsynchronousSocketChannel]): Unit =
socket.read(readByteBuffer.duplicate(), socket, h)
def write(
socket: AsynchronousSocketChannel
)(h: CompletionHandler[Integer, AsynchronousSocketChannel]): Unit =
socket.write(writeByteBuffer.duplicate(), socket, h)
}
// Generic async completion handler
case class Handle[V, A](cb: (V, A) => Unit) extends CompletionHandler[V, A] {
override def completed(result: V, attachment: A): Unit =
cb(result, attachment)
override def failed(cause: Throwable, attachment: A): Unit = {
cause match {
case e: IOException => log(e.getMessage)
case _ => cause.printStackTrace()
}
}
}
// Logging utility
def log(input: Any*): Unit = {
if (ENABLE_LOGGING) println(input.map(_.toString).mkString(", "))
}
private val onAccept
: Handle[AsynchronousSocketChannel, AsynchronousServerSocketChannel] =
Handle[AsynchronousSocketChannel, AsynchronousServerSocketChannel](
(socket, server) => {
log("\nACCEPT")
// Accept new connections immediately
server.accept(serverSocketChannel, onAccept)
// Read from the current socket
NIOBuffer.read(socket)(onRead)
}
)
private val onRead: Handle[Integer, AsynchronousSocketChannel] =
Handle[Integer, AsynchronousSocketChannel]((bytes, socket) => {
log("READ", bytes)
// EOF, meaning connection can be closed
if (bytes == -1) socket.close()
// Some data was read and now we can respond back
else if (bytes > 0) NIOBuffer.write(socket)(onWrite)
})
private val onWrite: Handle[Integer, AsynchronousSocketChannel] =
Handle[Integer, AsynchronousSocketChannel]((bytes, socket) => {
log("WRITE", bytes)
// Read from the socket
NIOBuffer.read(socket)(onRead)
})
def main(args: Array[String]): Unit = {
// Setup socket channel
serverSocketChannel.bind(new InetSocketAddress(PORT))
serverSocketChannel.accept(serverSocketChannel, onAccept)
// Making the main thread wait
group.awaitTermination(Long.MaxValue, TimeUnit.SECONDS)
}
}