Vert.x 做出 vertx.executeBlocking 的承诺

Vert.x make a promise from vertx.executeBlocking

我有四个 I/O 操作:ABCD。它们中的每一个都应该用 vertx.executeBlocking 来执行。我应该有下一个行为:

//PSEUDOCODE
waitForExecuteBlocking(A_OPERATION);
thenWaitForAllExecuteBlocking(`B_OPERATION`, `C_OPERATION`, `D_OPERATION`)
/* do something */

我怎样才能实现这种行为?

我在 Vertx Rx 中找不到解决方案。我不想将我的 *_OPERATION 类 包装为 worker verticles 是有原因的。

我将把我的回答分成两部分。它不依赖于 RxJava,而是仅依赖于常规 Java。
首先,等待A_OPERATION

    Vertx vertx = Vertx.vertx();

    CountDownLatch latch = new CountDownLatch(1);

    Long start = System.currentTimeMillis();

    vertx.deployVerticle(new AbstractVerticle() {

        @Override
        public void start() throws InterruptedException {
            // Just to demonstrate
            Thread.sleep(1000);

            latch.countDown();
        }
    });

    // Always use await with timeout
    latch.await(2, TimeUnit.SECONDS);

    System.out.println("Took me " + (System.currentTimeMillis() - start) + " millis");

现在来看一个更复杂的例子:

public static void main(String[] args) throws InterruptedException {

    Vertx vertx = Vertx.vertx();

    // This should be equal to number of operations to complete
    CountDownLatch latch = new CountDownLatch(3);

    Long start = System.currentTimeMillis();

    // Start your operations
    vertx.deployVerticle(new BlockingVerticle(latch));
    vertx.deployVerticle(new BlockingVerticle(latch));
    vertx.deployVerticle(new BlockingVerticle(latch));

    // Always use await with timeout
    latch.await(2, TimeUnit.SECONDS);

    System.out.println("Took me " + (System.currentTimeMillis() - start) + " millis");
}


private static class BlockingVerticle extends AbstractVerticle {

    private final CountDownLatch latch;

    public BlockingVerticle(CountDownLatch latch) {
        this.latch = latch;
    }

    @Override
    public void start() throws InterruptedException {


        long millis = 1000 + ThreadLocalRandom.current().nextInt(500);

        System.out.println("It will take me " + millis + " to complete");

        // Wait for some random time, but no longer that 1.5 seconds
        Thread.sleep(millis);

        latch.countDown();
    }

}

您应该注意主线程将被阻塞 max(B_OPERATION, C_OPERATION, D_OPERATION) + 几毫秒。

我要添加另一个答案,这次是 Futures。
首先请注意,这些是 Vertx 期货,而不是常规 Java 期货。使用正确的导入。
现在到代码:

// I'm running in main(), so everything is static, just for the sake of example
private static Vertx vertx = Vertx.vertx();
public static void main(String[] args) throws InterruptedException {


    long start = System.currentTimeMillis();

    // In your case it should be operationA(), operationB(), etc
    // But I wanted to make the code shorter
    CompositeFuture.all(operationA(), operationA(), operationA()).setHandler((r) -> {
        if (r.succeeded()) {
            // You can even iterate all the results
            List<String> results = r.result().list();
            for (String result : results) {
                System.out.println(result);
            }
            // This will still print max(operationA, operationB, operationC)
            System.out.println("Took me " + (System.currentTimeMillis() - start) + " millis");
        }
        else {
            System.out.println("Something went wrong");
        }
    });
}


// Return a future, then fulfill it after some time
private static Future<String> operationA() {
    Future<String> future = Future.future();

    long millis = 1000 + ThreadLocalRandom.current().nextInt(500);
    vertx.setTimer(millis, (l) -> {
        future.complete("All is good " + millis);
    });

    return future;
}

使用 Composite Futures 将有助于解决您的问题,compositeFutures 函数有很多 compositeFuture.Join 和 compositeFutures.all ,可以传递期货列表