如何在循环中进行一些异步调用

How to make some asynch calls in a loop

在一个循环中,我需要进行一些检查,实际上是在另一个 Verticle 中执行的。在我循环的每次迭代中,我需要检查从那些 Verticle 返回的响应代码,并相应地做出一些决定。换句话说,我需要停止循环的执行并以某种方式等到异步。打电话 returns。但是这样的执行停止违反了 vert.x 的理念,即主线程的执行应该永远不会停止。在Vert.x的范围内怎么办?到目前为止,我不知道该怎么做。任意 suggestions/code samples/urls 到 smth。非常感谢解决方案。

谢谢!

我认为您需要使用 FutureTask 并将它们存储在一个集合中,并在需要时使用 FutureTask.get() 检索结果,这是一个阻塞调用。

这听起来像是反应蒸汽处理的用例。 一般来说,这样的问题可以使用 2 方来解决:

  • 执行任务和returns异步结果的生产者
  • 订阅结果并执行其他任务的处理程序

有一种方法可以将生产者配置为仅在有订阅者时才执行任务。另一方面,订阅者可以决定在某些情况下取消订阅生产者。

我不熟悉反应流的顶点功能。但我将从 RxJava 集成开始 http://vertx.io/docs/vertx-rx/java/

使用 Vert.x 时,您需要少考虑循环,多考虑回调。 您应该使用 eventBus 在顶点之间进行通信。 假设您想要的是类似于此伪代码的东西:

for (int i = 0; i < 4; i++) {
   int result = getVerticleResult();
   System.out.println(result);
}

所以,这只是一个非常基本的例子

class LooperVerticle extends AbstractVerticle {

    private int i = 4;

    @Override
    public void start() throws Exception {
        doWork();
    }

    private void doWork() {
        vertx.eventBus().send("channel", "", (o) -> {
            if (o.succeeded()) {
                System.out.println(o.result().body());
                i--;
                if (i > 0) {
                    doWork();
                }
            }
        });
    }
}

class WorkerVerticle extends AbstractVerticle {

    @Override
    public void start() throws Exception {

        vertx.eventBus().consumer("channel", (o) -> {
            // Generate some random number
            int num = ThreadLocalRandom.current().nextInt(0, 9);

            // Simulate slowness
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            o.reply(num);
        });
    }
}

测试:

public class EventBusExample {

    public static void main(String[] args) {
        Vertx vertx = Vertx.vertx();
        vertx.deployVerticle(new LooperVerticle());
        vertx.deployVerticle(new WorkerVerticle());
    }
}