WebClient 中的 Flux 与 File.readLines 中的 Flux 行为不同

Flux from WebClient behaves differently than Flux from File.readLines

我必须使用一些 ID 的两个不同来源。一个来自文件,另一个来自 URL。当我从文件行创建 Flux 时,我可以很好地处理它。当我将创建 Flux 的函数与使用 WebClient....get() 的函数切换时,我得到了不同的结果;由于某种原因,WebClient 永远不会被调用。

private Flux<String> retrieveIdListFromFile(String filename) {
  try {
    return Flux.fromIterable(Files.readAllLines(ResourceUtils.getFile(filename).toPath()));
  } catch (IOException e) {
    return Flux.error(e);
  }
}

这里是 WebClient 部分...

private Flux<String> retrieveIdList() {
  return client.get()
      .uri(uriBuilder -> uriBuilder.path("capdocuments_201811v2/selectRaw")
          .queryParam("q", "-P_Id:[* TO *]")
          .queryParam("fq", "DateLastModified:[2010-01-01T00:00:00Z TO 2016-12-31T00:00:00Z]")
          .queryParam("fl", "id")
          .queryParam("rows", "10")
          .queryParam("wt", "csv")
          .build())
      .retrieve()
      .bodyToFlux(String.class);
}

当我对 WebClient 的流量执行 subscribe(System.out::println) 时,没有任何反应。当我执行 blockLast() 时,它起作用了(调用 URL,返回数据)。我不明白为什么,如何纠正这个问题,以及我做错了什么。 使用源自文件的流量,即使订阅也能正常工作。我有点想,Fluxes 是可以互换的...

当我做 retrieveIdList().log().subscribe():

INFO [main] reactor.Flux.OnAssembly.1 | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
INFO [main] reactor.Flux.OnAssembly.1 | request(unbounded)

当我用 blockLast() 而不是 subscribe() 做同样的事情时:

INFO [main] reactor.Flux.OnAssembly.1 | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
INFO [main] reactor.Flux.OnAssembly.1 | request(unbounded)
INFO [reactor-http-nio-4] reactor.Flux.OnAssembly.1 | onNext(id)
.
.
.

从你的问题更新来看,似乎没有任何东西在等待处理完成。我假设这是批处理或 CLI 应用程序,而不是 Web 应用程序?

假设如下:

Flux<User> users = userService.fetchAll();

Flux 上调用 blockLast 将触发处理并 block 直到结果出现。

对其调用subscribe将触发异步处理;我们在您的日志中看到了 subscriber request 元素,但仅此而已。这可能意味着 JVM 在发布任何元素之前退出 - 没有任何东西在等待结果。

如果您有效地编写了一些 CLI/batch 应用程序而不是在 Web 应用程序中处理请求,您可以 block 在最终的反应管道上获得结果。如果您希望将该结果写入文件或将其发送到不同的服务,那么您应该使用反应器操作符对其进行组合。