顶点处理程序如何工作?

How vertx handlers work?

上周我阅读了有关 vertx 的文档。我不明白的是顶点处理程序是如何工作的?例如

public class Client extends AbstractVerticle{

    @Override
    public void start() throws Exception {
       final HttpClient httpClient = this.vertx.createHttpClient();
       this.vertx.setPeriodic(1000, handler->{
           httpClient.getNow(8080, "localhost", "/", responseHandler -> {
                System.out.println("response");
            });
       });
    }

}

服务器是:

public class JdbcVertx extends AbstractVerticle{

    @Override
    public void start() throws Exception {
        JDBCClient client = JDBCClient.createNonShared(this.vertx, new JsonObject()
                                .put("url", "jdbc:postgresql://localhost:5432/test")
                                .put("user", "user")
                                .put("password", "password")
                                .put("driver_class", "org.postgresql.Driver")
                                .put("max_pool_size", 30));
        this.vertx.createHttpServer()
                .requestHandler(r -> {
                    client.getConnection(handler -> {                     
                        final SQLConnection connection = handler.result();
                        connection.execute(execute(), hndlr -> {
                                connection.close(closehndlr -> {                                 
                                        r.response().putHeader("content-type", "text/html").end("Response");
                                });                                   
                        });
                    });
                }).listen(8080);
    }

    private String execute(){
            return "insert into rubish (name) values ('test')";      
    }
}

(P.S 我知道我首先应该检查处理程序是否成功,然后采取一些措施,但我删除了此检查以简化代码,如果在 30 秒内没有任何响应,我也会从官方文档中删除处理程序中会有异常)

从上面的代码来看,客户端每秒发送请求并且不等待响应,但是它有一个处理程序将在响应到来时执行。

'JdbcVertx' 在端口 8080 上侦听,获取请求,通过睡眠插入数据库,例如 3 秒(我将 1_000_000 行放入数据库并创建索引以减慢插入时间)然后发送响应,因此每个请求都是非阻塞的。

据我所知,vertx 只有一个名为 EventLoop 事件循环的线程,来自 jdbcVertx get reqests 但不会 return 立即响应,而是放置一个将在数据库插入成功时执行的处理程序。事件循环如何知道 IO 操作何时完成。我认为它使用这样的东西

if(Thread.currentThread().getState != 'blocked' && sec != 30){
    this.object.getHandler().execute();
} else if(sec == 30){
 Thread.currentThread.inerrupt();
  } else{
    sec++;
  }

但是我们只有一个线程,当我们有阻塞调用时它没有线程,只有处理程序。

问题是,事件循环如何知道阻塞操作何时结束以及何时执行处理程序

But we have only one thread, and when we have blocking call it doesn't has a thread, only handler. How it work , and why do we need to use Worker Verticle if we can use handlers instead?

处理程序只是在收到事件总线消息或 http 调用时触发的操作。它们不是为您处理可扩展性而设计的。如果你只使用处理程序并且你的操作开始花费很长时间或者你的请求数量有任何增加,你将阻塞你的 Verticle 的事件循环并且会有很多 Thread xxxx has been blocked 警告。

要回答处理程序的工作原理以及事件循环为何不等待处理程序结束而启动另一个处理程序,请参考:https://vertx.io/docs/vertx-core/java/#_reactor_and_multi_reactor

Instead of a single event loop, each Vertx instance maintains several event loops. By default we choose the number based on the number of available cores on the machine, but this can be overridden.

This means a single Vertx process can scale across your server, unlike Node.js.

We call this pattern the Multi-Reactor Pattern to distinguish it from the single threaded reactor pattern.

但在我看来,这还不足以为您解决所有可伸缩性和线程阻塞问题,您也应该阅读以下内容:https://vertx.io/docs/vertx-core/java/#golden_rule

有很多方法可以设计 Verticle,但您必须尽可能保持 non-blocking。在我看来,使用 vert.x 和传统的阻塞方法(例如阻塞 restfull 端点)是不相关的。

就我个人而言,我会按如下方式设计我的垂直线:

  • verticle A:公开一个 restfull 端点并进行回调 url(无论采取何种操作 GET/POST/PUT/PATCH/DELETE)。 Verticle 总是立即响应 202 Accepted 而没有结果,并在事件总线中向 Verticle B 发送消息。

  • verticle B:获取消息,执行操作(最终与事件总线异步调用其他 verticle 并等待回复)并回复调用回调 url.

我会避免使用 worker verticleexecuteBlocking 方法,甚至避免创建线程池。我有特权将我的 Verticle B 的实例(在单独的 pids 中)相乘,这些实例监听相同的 eventbus 集群(最终 Verticle A 带有 http 反向代理)。我们甚至可以想象根据实时请求的数量,拥有可变数量的 Verticle B 实例(在单独的 pids 中)。

P.S : 有时我会使用更强大的消息代理工具,如 Apache Kafka 而不是原生的事件总线(当我需要尊重某种消息,或者当我需要重播某些消息时)。

回答问题:

how event loop know when blocking operation is ended and it's time to execute handler?

根据non-blocking模型,调用时event-loop

connection.execute( execute(), hndlr )

产生一个新线程,执行您的阻塞代码,并在它完成时(类似于 Thread.join())调用 event-loop 线程中的 hndlr 回调。因此,尽管可以执行阻塞代码,但主循环不会被阻塞。