为什么下面的 Java NIO API 与 netty 相比这么慢

Why is the following Java NIO API so slow when compared to netty

我有以下 JAVA 实现来使用 NIO API 创建一个简单的 Web 服务器。

package zion

import java.net._
import java.nio.ByteBuffer
import java.nio.channels._

object NHello {

  import java.nio.CharBuffer
  import java.nio.charset.Charset

  def helloWorldBytes: ByteBuffer = Charset
    .forName("ISO-8859-1")
    .newEncoder
    .encode(CharBuffer.wrap(httpResponse("NHello World\n")))

  def httpResponse(content: String): String = {
    val rn = "\r\n"
    List(
      "HTTP/1.1 200 OK",
      "Content-Type: text/html",
      "Connection: Keep-Alive",
      s"Content-Length: ${content.length()}",
      rn + content
    ).mkString(rn)
  }

  def main(args: Array[String]): Unit = {
    val port    = 8080
    val address = new InetSocketAddress(port)

    // Server Socket Channel
    val serverSocketChannel = ServerSocketChannel.open()
    serverSocketChannel.bind(address)
    serverSocketChannel.configureBlocking(false)

    // Selector
    val selector = Selector.open()
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT)

    while (true) {
      selector.select()
      val iterator = selector.selectedKeys().iterator()
      while (iterator.hasNext) {
        val key = iterator.next()
        if (key.isAcceptable) {
          val channel = serverSocketChannel.accept()
          channel.write(helloWorldBytes)
          channel.close()
        }

      }
      iterator.remove()
    }

    sys.addShutdownHook({
      println("Shutting down...")
      serverSocketChannel.close()
    })

    println("Exiting...")
  }
}

使用 wrk 我每秒处理几千个请求。

wrk -t12 -c100 -d10s http://127.0.0.1:8080

与 Netty 相比,这似乎有点太慢了。使用 Netty,我可以获得至少 10 到 15 倍的吞吐量。考虑到 Netty 也是建立在 NIO 之上的,我做错了什么?

是否有一些明显的性能优化是我遗漏的?

很多,基本上你是在以 'blocking' 的方式使用 NIO。检查这个:

https://bedrin.livejournal.com/204307.html https://crunchify.com/java-nio-non-blocking-io-with-server-client-example-java-nio-bytebuffer-and-channels-selector-java-nio-vs-io/

经过进一步的搜索和分析,我终于找出了上面这段代码中的所有问题。

def main(args: Array[String]): Unit = {
    val port    = 8080
    val address = new InetSocketAddress(port)

    val serverSocketChannel = ServerSocketChannel.open()
    serverSocketChannel.bind(address)
    serverSocketChannel.configureBlocking(false)

    val selector = Selector.open()
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT)

    while (true) {
      selector.select()
      val iterator = selector.selectedKeys().iterator()
      while (iterator.hasNext) {
        val key = iterator.next()
        if (key.isAcceptable) {
          val channel = serverSocketChannel.accept()
          // 1. Blocking Write
          channel.write(helloWorldBytes)
          // 2. Blocking Close
          channel.close()
        }

      }
      iterator.remove()
    }

    sys.addShutdownHook({
      println("Shutting down...")
      serverSocketChannel.close()
    })

    println("Exiting...")
  }
}

主要问题是

1.阻止写入 由于阻塞写入调用,除非将字节写入流中,否则我无法接受更多连接。所以那些连接只是闲置,从而影响网络服务器的性能

2。阻止关闭 close 调用也是阻塞的,需要一些时间才能完成。同样,除非连接关闭,否则不会接受新请求,也不会响应已接受的连接。

关闭连接还有另一个问题:创建新连接的成本很高,而且 wrk 等工具不会在发出一个请求后自动终止连接。在服务器上关闭它是在每个请求之后也会成为性能杀手,从而影响您的基准测试。

这是一个替代 "Highly performant" 实现

package zion

import java.io.IOException
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.{
  AsynchronousChannelGroup,
  AsynchronousServerSocketChannel,
  AsynchronousSocketChannel,
  CompletionHandler
}
import java.util.concurrent.{Executors, TimeUnit}

/**
  * This is potentially as fast as it can get using NIO APIs.
  */
object HelloAsyncNIO {
  // Create a thread pool for the socket channel
  // It would be better to have probably only one thread for events.
  // That pool could be shared betwee the SocketServer and in future SocketClients.
  private val group =
    AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(24))

  // Socket to accept connections
  private val serverSocketChannel = AsynchronousServerSocketChannel.open(group)

  // Port to be used to connect
  private val PORT = 8081

  // Flag to handle logging
  private val ENABLE_LOGGING = false

  /**
    * Contains utilities to manage read/write on the socket channels
    */
  object NIOBuffer {
    def helloWorldBytes: ByteBuffer = Charset
      .forName("ISO-8859-1")
      .newEncoder
      .encode(CharBuffer.wrap(httpResponse("NHello World\n")))

    def httpResponse(content: String): String = {
      val rn = "\r\n"
      List(
        "HTTP/1.1 200 OK",
        "Content-Type: text/html",
        "Connection: Keep-Alive",
        s"Content-Length: ${content.length()}",
        rn + content
      ).mkString(rn)
    }
    private val writeByteBuffer = ByteBuffer.wrap(helloWorldBytes)
    private val readByteBuffer  = ByteBuffer.allocateDirect(1024 * 2) // 2kb
    def read(
        socket: AsynchronousSocketChannel
    )(h: CompletionHandler[Integer, AsynchronousSocketChannel]): Unit =
      socket.read(readByteBuffer.duplicate(), socket, h)
    def write(
        socket: AsynchronousSocketChannel
    )(h: CompletionHandler[Integer, AsynchronousSocketChannel]): Unit =
      socket.write(writeByteBuffer.duplicate(), socket, h)
  }

  // Generic async completion handler
  case class Handle[V, A](cb: (V, A) => Unit) extends CompletionHandler[V, A] {
    override def completed(result: V, attachment: A): Unit =
      cb(result, attachment)
    override def failed(cause: Throwable, attachment: A): Unit = {
      cause match {
        case e: IOException => log(e.getMessage)
        case _              => cause.printStackTrace()
      }
    }
  }

  // Logging utility
  def log(input: Any*): Unit = {
    if (ENABLE_LOGGING) println(input.map(_.toString).mkString(", "))
  }

  private val onAccept
      : Handle[AsynchronousSocketChannel, AsynchronousServerSocketChannel] =
    Handle[AsynchronousSocketChannel, AsynchronousServerSocketChannel](
      (socket, server) => {
        log("\nACCEPT")

        // Accept new connections immediately
        server.accept(serverSocketChannel, onAccept)

        // Read from the current socket
        NIOBuffer.read(socket)(onRead)
      }
    )

  private val onRead: Handle[Integer, AsynchronousSocketChannel] =
    Handle[Integer, AsynchronousSocketChannel]((bytes, socket) => {
      log("READ", bytes)

      // EOF, meaning connection can be closed
      if (bytes == -1) socket.close()

      // Some data was read and now we can respond back
      else if (bytes > 0) NIOBuffer.write(socket)(onWrite)

    })

  private val onWrite: Handle[Integer, AsynchronousSocketChannel] =
    Handle[Integer, AsynchronousSocketChannel]((bytes, socket) => {
      log("WRITE", bytes)

      // Read from the socket
      NIOBuffer.read(socket)(onRead)
    })

  def main(args: Array[String]): Unit = {

    // Setup socket channel
    serverSocketChannel.bind(new InetSocketAddress(PORT))
    serverSocketChannel.accept(serverSocketChannel, onAccept)

    // Making the main thread wait
    group.awaitTermination(Long.MaxValue, TimeUnit.SECONDS)
  }
}