WebClient 中的 Flux 与 File.readLines 中的 Flux 行为不同
Flux from WebClient behaves differently than Flux from File.readLines
我必须使用一些 ID 的两个不同来源。一个来自文件,另一个来自 URL。当我从文件行创建 Flux
时,我可以很好地处理它。当我将创建 Flux 的函数与使用 WebClient....get() 的函数切换时,我得到了不同的结果;由于某种原因,WebClient 永远不会被调用。
private Flux<String> retrieveIdListFromFile(String filename) {
try {
return Flux.fromIterable(Files.readAllLines(ResourceUtils.getFile(filename).toPath()));
} catch (IOException e) {
return Flux.error(e);
}
}
这里是 WebClient 部分...
private Flux<String> retrieveIdList() {
return client.get()
.uri(uriBuilder -> uriBuilder.path("capdocuments_201811v2/selectRaw")
.queryParam("q", "-P_Id:[* TO *]")
.queryParam("fq", "DateLastModified:[2010-01-01T00:00:00Z TO 2016-12-31T00:00:00Z]")
.queryParam("fl", "id")
.queryParam("rows", "10")
.queryParam("wt", "csv")
.build())
.retrieve()
.bodyToFlux(String.class);
}
当我对 WebClient 的流量执行 subscribe(System.out::println)
时,没有任何反应。当我执行 blockLast() 时,它起作用了(调用 URL,返回数据)。我不明白为什么,如何纠正这个问题,以及我做错了什么。
使用源自文件的流量,即使订阅也能正常工作。我有点想,Fluxes 是可以互换的...
当我做 retrieveIdList().log().subscribe()
:
INFO [main] reactor.Flux.OnAssembly.1 | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
INFO [main] reactor.Flux.OnAssembly.1 | request(unbounded)
当我用 blockLast() 而不是 subscribe() 做同样的事情时:
INFO [main] reactor.Flux.OnAssembly.1 | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
INFO [main] reactor.Flux.OnAssembly.1 | request(unbounded)
INFO [reactor-http-nio-4] reactor.Flux.OnAssembly.1 | onNext(id)
.
.
.
从你的问题更新来看,似乎没有任何东西在等待处理完成。我假设这是批处理或 CLI 应用程序,而不是 Web 应用程序?
假设如下:
Flux<User> users = userService.fetchAll();
在 Flux
上调用 blockLast
将触发处理并 block
直到结果出现。
对其调用subscribe
将触发异步处理;我们在您的日志中看到了 subscriber request
元素,但仅此而已。这可能意味着 JVM 在发布任何元素之前退出 - 没有任何东西在等待结果。
如果您有效地编写了一些 CLI/batch 应用程序而不是在 Web 应用程序中处理请求,您可以 block
在最终的反应管道上获得结果。如果您希望将该结果写入文件或将其发送到不同的服务,那么您应该使用反应器操作符对其进行组合。
我必须使用一些 ID 的两个不同来源。一个来自文件,另一个来自 URL。当我从文件行创建 Flux
时,我可以很好地处理它。当我将创建 Flux 的函数与使用 WebClient....get() 的函数切换时,我得到了不同的结果;由于某种原因,WebClient 永远不会被调用。
private Flux<String> retrieveIdListFromFile(String filename) {
try {
return Flux.fromIterable(Files.readAllLines(ResourceUtils.getFile(filename).toPath()));
} catch (IOException e) {
return Flux.error(e);
}
}
这里是 WebClient 部分...
private Flux<String> retrieveIdList() {
return client.get()
.uri(uriBuilder -> uriBuilder.path("capdocuments_201811v2/selectRaw")
.queryParam("q", "-P_Id:[* TO *]")
.queryParam("fq", "DateLastModified:[2010-01-01T00:00:00Z TO 2016-12-31T00:00:00Z]")
.queryParam("fl", "id")
.queryParam("rows", "10")
.queryParam("wt", "csv")
.build())
.retrieve()
.bodyToFlux(String.class);
}
当我对 WebClient 的流量执行 subscribe(System.out::println)
时,没有任何反应。当我执行 blockLast() 时,它起作用了(调用 URL,返回数据)。我不明白为什么,如何纠正这个问题,以及我做错了什么。
使用源自文件的流量,即使订阅也能正常工作。我有点想,Fluxes 是可以互换的...
当我做 retrieveIdList().log().subscribe()
:
INFO [main] reactor.Flux.OnAssembly.1 | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
INFO [main] reactor.Flux.OnAssembly.1 | request(unbounded)
当我用 blockLast() 而不是 subscribe() 做同样的事情时:
INFO [main] reactor.Flux.OnAssembly.1 | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
INFO [main] reactor.Flux.OnAssembly.1 | request(unbounded)
INFO [reactor-http-nio-4] reactor.Flux.OnAssembly.1 | onNext(id)
.
.
.
从你的问题更新来看,似乎没有任何东西在等待处理完成。我假设这是批处理或 CLI 应用程序,而不是 Web 应用程序?
假设如下:
Flux<User> users = userService.fetchAll();
在 Flux
上调用 blockLast
将触发处理并 block
直到结果出现。
对其调用subscribe
将触发异步处理;我们在您的日志中看到了 subscriber request
元素,但仅此而已。这可能意味着 JVM 在发布任何元素之前退出 - 没有任何东西在等待结果。
如果您有效地编写了一些 CLI/batch 应用程序而不是在 Web 应用程序中处理请求,您可以 block
在最终的反应管道上获得结果。如果您希望将该结果写入文件或将其发送到不同的服务,那么您应该使用反应器操作符对其进行组合。