Vert.x 多线程网络套接字
Vert.x multi-thread web-socket
我有简单的 vert.x 应用程序:
public class Main {
public static void main(String[] args) {
Vertx vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(40).setInternalBlockingPoolSize(40));
Router router = Router.router(vertx);
long main_pid = Thread.currentThread().getId();
Handler<ServerWebSocket> wsHandler = serverWebSocket -> {
if(!serverWebSocket.path().equalsIgnoreCase("/ws")){
serverWebSocket.reject();
} else {
long socket_pid = Thread.currentThread().getId();
serverWebSocket.handler(buffer -> {
String str = buffer.getString(0, buffer.length());
long handler_pid = Thread.currentThread().getId();
log.info("Got ws msg: " + str);
String res = String.format("(req:%s)main:%d sock:%d handlr:%d", str, main_pid, socket_pid, handler_pid);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
serverWebSocket.writeFinalTextFrame(res);
});
}
};
vertx
.createHttpServer()
.websocketHandler(wsHandler)
.listen(8080);
}
}
当我将此服务器与多个客户端连接时,我发现它在一个线程中工作。但我想并行处理每个客户端连接。我应该如何更改此代码才能做到这一点?
这个:
new VertxOptions().setWorkerPoolSize(40).setInternalBlockingPoolSize(40)
看起来您正在尝试创建自己的 HTTP 连接池,这可能不是您真正想要的。
Vert.x
和其他基于 event-loop
的非阻塞框架的想法是,我们不尝试 1 thread -> 1 connection
亲和力,而是在当前正在处理的请求时通过事件循环线程正在等待 IO - 例如来自数据库的响应 - 事件循环线程被释放以服务另一个连接。然后,这允许单个事件循环线程以类似并发的方式为多个连接提供服务。
如果你想充分利用你机器上的所有内核,而你只会运行一个verticle
,那么将实例数设置为内核数你部署你的verticle。
IE
Vertx.vertx().deployVerticle("MyVerticle", new DeploymentOptions().setInstances(Runtime.getRuntime().availableProcessors()));
Vert.x 是 reactive framework, which means that it uses a single thread model to handle all your application load. This model is known 比线程模型更好地缩放。
要知道的关键点是,您放入处理程序中的所有代码 必须 永远不会阻塞(就像您的 Thread.sleep
),因为它会阻塞主线程。如果您有阻塞代码(例如 JDBC 调用),您应该将阻塞代码包装在 executingBlocking
处理程序中,例如:
serverWebSocket.handler(buffer -> {
String str = buffer.getString(0, buffer.length());
long handler_pid = Thread.currentThread().getId();
log.info("Got ws msg: " + str);
String res = String.format("(req:%s)main:%d sock:%d handlr:%d", str, main_pid, socket_pid, handler_pid);
vertx.executeBlocking(future -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
serverWebSocket.writeFinalTextFrame(res);
future.complete();
});
});
现在,所有阻塞代码都将 运行 在线程池中的一个线程上,您可以按照其他回复中所示进行配置。
如果您想避免编写所有这些执行阻塞处理程序并且您知道您需要执行多个阻塞调用,那么您应该考虑使用 worker verticle,因为这些将在事件总线级别扩展。
关于多线程的最后一点注意事项是,如果您使用多个线程,您的服务器将不会像单线程一样高效,例如,它无法处理 1000 万个 websocket,因为 1000 万个线程事件发生在现代机器(我们在 2016 年)将使您的 OS 调度器崩溃。
我有简单的 vert.x 应用程序:
public class Main {
public static void main(String[] args) {
Vertx vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(40).setInternalBlockingPoolSize(40));
Router router = Router.router(vertx);
long main_pid = Thread.currentThread().getId();
Handler<ServerWebSocket> wsHandler = serverWebSocket -> {
if(!serverWebSocket.path().equalsIgnoreCase("/ws")){
serverWebSocket.reject();
} else {
long socket_pid = Thread.currentThread().getId();
serverWebSocket.handler(buffer -> {
String str = buffer.getString(0, buffer.length());
long handler_pid = Thread.currentThread().getId();
log.info("Got ws msg: " + str);
String res = String.format("(req:%s)main:%d sock:%d handlr:%d", str, main_pid, socket_pid, handler_pid);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
serverWebSocket.writeFinalTextFrame(res);
});
}
};
vertx
.createHttpServer()
.websocketHandler(wsHandler)
.listen(8080);
}
}
当我将此服务器与多个客户端连接时,我发现它在一个线程中工作。但我想并行处理每个客户端连接。我应该如何更改此代码才能做到这一点?
这个:
new VertxOptions().setWorkerPoolSize(40).setInternalBlockingPoolSize(40)
看起来您正在尝试创建自己的 HTTP 连接池,这可能不是您真正想要的。
Vert.x
和其他基于 event-loop
的非阻塞框架的想法是,我们不尝试 1 thread -> 1 connection
亲和力,而是在当前正在处理的请求时通过事件循环线程正在等待 IO - 例如来自数据库的响应 - 事件循环线程被释放以服务另一个连接。然后,这允许单个事件循环线程以类似并发的方式为多个连接提供服务。
如果你想充分利用你机器上的所有内核,而你只会运行一个verticle
,那么将实例数设置为内核数你部署你的verticle。
IE
Vertx.vertx().deployVerticle("MyVerticle", new DeploymentOptions().setInstances(Runtime.getRuntime().availableProcessors()));
Vert.x 是 reactive framework, which means that it uses a single thread model to handle all your application load. This model is known 比线程模型更好地缩放。
要知道的关键点是,您放入处理程序中的所有代码 必须 永远不会阻塞(就像您的 Thread.sleep
),因为它会阻塞主线程。如果您有阻塞代码(例如 JDBC 调用),您应该将阻塞代码包装在 executingBlocking
处理程序中,例如:
serverWebSocket.handler(buffer -> {
String str = buffer.getString(0, buffer.length());
long handler_pid = Thread.currentThread().getId();
log.info("Got ws msg: " + str);
String res = String.format("(req:%s)main:%d sock:%d handlr:%d", str, main_pid, socket_pid, handler_pid);
vertx.executeBlocking(future -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
serverWebSocket.writeFinalTextFrame(res);
future.complete();
});
});
现在,所有阻塞代码都将 运行 在线程池中的一个线程上,您可以按照其他回复中所示进行配置。
如果您想避免编写所有这些执行阻塞处理程序并且您知道您需要执行多个阻塞调用,那么您应该考虑使用 worker verticle,因为这些将在事件总线级别扩展。
关于多线程的最后一点注意事项是,如果您使用多个线程,您的服务器将不会像单线程一样高效,例如,它无法处理 1000 万个 websocket,因为 1000 万个线程事件发生在现代机器(我们在 2016 年)将使您的 OS 调度器崩溃。