使用 Vertx 实现客户端分页 API 调用的最佳方法是什么?

What is the best way to implement client-side paginated API calls with Vertx?

我有一个 Vert.x Web 服务需要对外部 API 进行一系列分页调用。外部服务通过在每个响应中包含一个 "next" 字段来实现分页——它直接 link 到下一页数据——以及需要获取的页面总数的计数所有数据。这是一个示例响应:

"pagination": {
  "count": 1000,
  "totalPages": 112,
  "next": "https://some-host.com?next=some-long-alphanumeric-hash"
},
"data": [ ... ]

在进行第一次 API 调用后,我知道后续调用的总数(在本例中为 111)以及获取下一页数据的 URL。在同步环境中,我可以这样做:

Collection aggregatedResults;
int count = 0;
String nextUrl = "";
while (count <= total pages) {
   make next request
   add the chunk of data from this response to the collection
   store the next URL in local variable
   increment count
}

我对 Vertx 的策略是使用 Futures 来表示单个调用的结果,然后将它们与 CompositeFuture.all() 链接在一起。这大概是我到目前为止所拥有的(省略了一些代码以保存space):

private String nextUrl; // global String

doFirstCall(client).setHandler(async -> {
    if (async.failed()) {
      // blah
    } else {
        Response response = async.result();
        int totalPages = response.getTotalPages();
        next = response.getNext();

        List<Future> paginatedFutures = IntStream
                .range(0, totalPages - 1)
                .mapToObj(i -> {
                    Promise<Response> promise = Promise.promise();
                    doIndividualPaginatedCall(client, next)
                            .setHandler(call -> {
                                if (call.succeeded()) {
                                    Response chunk = call.result();
                                    next = chunk.getNext(); // store the next URL in global string so it can be accessed within the loop
                                    promise.complete(chunk);
                                } else {
                                    promise.fail(call.cause());
                                }
                            });
                    return promise.future();
                })
                .collect(Collectors.toList());

        CompositeFuture.all(paginatedFutures).setHandler(all -> {
            if (all.succeeded()) {
                // Do something with the aggregated responses
            }
        });
    }
});

当我 运行 这段代码时,第一次调用总是成功并且我成功地存储了 "next" URL。然后,我进行的每个后续分页调用都与第一次调用时收到的 URL 相同,我看到这样的日志:

Call succeeded. i: 16, next: https://blah.com/blah?filter=next(DnF1ZXJ5VGhlbkZldGNoBQAAAAAAlMYVFjdaM2ducHBaVGJHeWV5ZjRzNGRQMXcAAAAAAJTGNhYzcWlRTDEyeVJZS05PeV84QkJlLTVnAAAAAACUxjYWa3UzUkx1MXZURG1Pc2E5WGt5RG9pdwAAAAAAlMY2FnY4TVhXajlqUmMtWEQwWU1naGZFN3cAAAAAAJTGVxZCWWFUV19XR1RXQ05DRkI0NGw4M0xB)
Call succeeded. i: 17, next: https://blah.com/blah?filter=next(DnF1ZXJ5VGhlbkZldGNoBQAAAAAAlMYVFjdaM2ducHBaVGJHeWV5ZjRzNGRQMXcAAAAAAJTGNhYzcWlRTDEyeVJZS05PeV84QkJlLTVnAAAAAACUxjYWa3UzUkx1MXZURG1Pc2E5WGt5RG9pdwAAAAAAlMY2FnY4TVhXajlqUmMtWEQwWU1naGZFN3cAAAAAAJTGVxZCWWFUV19XR1RXQ05DRkI0NGw4M0xB)
Call succeeded. i: 18, next: https://blah.com/blah?filter=next(DnF1ZXJ5VGhlbkZldGNoBQAAAAAAlMYVFjdaM2ducHBaVGJHeWV5ZjRzNGRQMXcAAAAAAJTGNhYzcWlRTDEyeVJZS05PeV84QkJlLTVnAAAAAACUxjYWa3UzUkx1MXZURG1Pc2E5WGt5RG9pdwAAAAAAlMY2FnY4TVhXajlqUmMtWEQwWU1naGZFN3cAAAAAAJTGVxZCWWFUV19XR1RXQ05DRkI0NGw4M0xB)

TLDR:如何执行一系列分页的 API 调用,其中 URL 在每次调用之间发生变化,并且直到上一个调用完成执行后才知道?我试过使用 CompositeFuture.join,但效果相同。我知道对于顺序组合,您应该使用 compose(),但我如何组合未知数量的函数调用?

您正在尝试变异 next

if (call.succeeded()) {
    Response chunk = call.result();
    next = chunk.getNext(); // store the next URL in global string so it can be accessed within the loop
    promise.complete(chunk);
}

但您实际上是在重复使用第一次获得的相同值:

next = response.getNext();

那是因为您的所有调用都在其中一个调用 return 之前很久就被调用了。

由于在上次调用 return 之前无法知道 next 值,因此您必须以递归方式实现它,并删除 map:

doIndividualPaginatedCall(client, next)
   .setHandler(call -> {
        if (call.succeeded()) {
            Response chunk = call.result();
            next = chunk.getNext(); // store the next URL in global string so it can be accessed within the loop
            promise.complete(chunk);
            doIndividualPaginatedCall(client, next);
       } else {
           promise.fail(call.cause());
       }
  });

请注意,我实际上并没有编译您的代码,因此您可能需要做更多的更改才能使其实际工作。

事实证明我误解了我在这个问题中连接的 API,并且 "next" 字段在调用之间不会改变。因此,本题简化为"How do I implement async client-side pagination in Vertx, where I do know the URL prior to each paginated call?"。我接受 Alexey 的回答,因为它回答了最初的问题,并发布了我在下面使用的粗略代码,以防万一这有助于任何有相同用例的人:

// start()
doFirstCall(client).setHandler(async -> {
  if (async.succeeded()) {
    Response response = async.result();
    final int totalPages = response.totalPages();
    final String next = response.next();

    // Fire off 'totalPages' async calls and wait for them to come back
    List<Future> paginatedFutures = IntStream
      .range(0, totalPages)
      .mapToObj(i -> {
         Promise<Response> promise = Promise.promise();
         doPaginatedCall(client).setHandler(call -> {
            if (call.succeeded()) {
              promise.complete(call.result());
            }
         });
         return promise.future();
      }).collect(Collectors.toList());

   // Wait for all HTTP calls to come back before continuing
   CompositeFuture.join(paginatedFutures).setHandler(all -> {
      if (all.succeeded()) {
         // Do something with all of the aggregated calls
      }
  });
  }
});

private Future<Response> doFirstCall(WebClient client) {
   Promise<Response> promise = Promise.promise();

   // If call succeeded, promise.complete(response), otherwise fail

   return promise.future();
}

private Future<Response> doPaginatedCall(WebClient client, String nextUrl) {
   Promise<Response> promise = Promise.promise();

   // If call succeeded, promise.complete(response), otherwise fail

   return promise.future();
}