Vertx 中的异步自定义方法

Async custom method in Vertx

我正在尝试在 Vert.x 中创建我自己的异步自定义方法,类似于他们的代码:

// call the external service
WebClient client = WebClient.create(vertx);

client.get(8080, "localhost:8080", "/fast").send(ar -> {
    if (ar.succeeded()) {

        HttpResponse<Buffer> response = ar.result();
        System.out.println("response.bodyAsString()" + response.bodyAsString());

    } else {
        System.out.println("Something went wrong " + ar.cause().getMessage());
    }
});

当您 运行 此代码时,线程会在不阻塞所有者线程的情况下休眠,并在端点响应时执行提供的处理程序。

我找到了实现它的方法:"executeBlocking"、"createSharedWorkerExecutor.executeBlocking" 和使用总线,但在所有这些方法中线程都被阻塞了。

我正在寻找不阻塞容器线程的方法,但我没有找到。有一个post:

我试过了,但它也阻塞了线程:

vertx.runOnContext(v -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
    }
    handler.handle(Future.succeededFuture("result"));
});

上面 运行 的代码在同一个线程中,但没有 运行 并发,所以我假设线程被阻塞了。

有什么办法吗?

您调用 Thread.sleep() 的方式将使您当前的 JVM 线程进入睡眠状态,从而有效地阻止您当前的 vert.x 事件循环,其中 运行 在同一线程中。这不是 vert.x 中执行阻塞代码的惯用方式。

参见此处:“The Golden Rule - don't block the event loop”。

如果您必须 运行 阻止代码,例如 Thread.sleep(),您应该使用 worker verticle 实现该代码。 Worker Verticles 使用来自不同线程池的 JVM 线程,因此不会阻塞事件循环。

您上面发布的第一个代码示例没有使用阻塞代码,正如您正确描述的那样。它使用异步、非阻塞事件处理程序的惯用方式。

编辑

查看这个关于如何启动一个非常简单的 worker Verticle 的简短示例。

来自 class WorkerVerticle 的代码永远不会阻塞事件循环。在 Verticle 部署期间,您可以通过设置正确的选项使其成为工作人员,如 DeployerVerticle.

中所示
public class DeployerVerticle extends AbstractVerticle {

    @Override
    public void start() throws Exception {
        System.out.println("Main verticle has started, let's deploy another...");

        // Deploy it as a worker verticle
        vertx.deployVerticle("io.example.WorkerVerticle", 
                             new DeploymentOptions().setWorker(true));
    }
}


// ----

package io.example;
/**
 * An example of a worker verticle
 */
public class WorkerVerticle extends AbstractVerticle {

    @Override
    public void start() throws Exception {
        System.out.println("[Worker] Starting in " +
            Thread.currentThread().getName());

        // consume event bus messages sent to address "sample.data"
        // reply with incoming message transformed to upper case
        vertx.eventBus().<String>consumer("sample.data", message -> {

            Thread.sleep(1000); // will not block the event loop
                                // but only this verticle

            System.out.println("[Worker] Consuming data in " + 
                Thread.currentThread().getName());
            String body = message.body();
            message.reply(body.toUpperCase());
        });
    }
}