在完成之前开始处理来自服务器的 Flux 响应:这可能吗?

Start processing Flux response from server before completion: is it possible?

我有 2 个 Spring-Boot-Reactive 应用程序,一个服务器和一个客户端;客户端像这样调用服务器:

Flux<Thing> things = thingsApi.listThings(5);

我想将其作为列表供以后使用:

// "extractContent" operation takes 1.5s per "thing"
List<String> thingsContent = things.map(ThingConverter::extractContent)
        .collect(Collectors.toList())
        .block()

在服务器端,端点定义如下所示:

@Override
public Mono<ResponseEntity<Flux<Thing>>> listThings(
        @NotNull @Valid @RequestParam(value = "nbThings") Integer nbThings,
        ServerWebExchange exchange
) {
    // "getThings" operation takes 1.5s per "thing"
    Flux<Thing> things = thingsService.getThings(nbThings);
    return Mono.just(new ResponseEntity<>(things, HttpStatus.OK));
}

签名来自 Open-API 生成的代码(Spring-启动服务器,反应模式)。


我观察到的: 客户端立即跳转到 things.map 但只有在服务器发送完所有“东西”后才开始处理 Flux .

我想要什么:服务器应该在生成“东西”时发送它们,以便客户端可以开始处理它们当他们到达时,处理时间有效减半。

有办法实现吗?我在网上找到了很多关于服务器部分的教程,但是 none 和 java 客户端。我听说过服务器发送的事件,但是我的目标可以使用 returns a Flux 的“经典”Open-API 端点定义来实现吗?

这个问题似乎太复杂了,无法在问题正文中放入一个最小的可行示例;完整代码可供参考 on Github.

编辑:在合并建议的解决方案后将 link 重定向到 main 分支

我已经运行通过改变2点:

  • 首先:我已将您的 /things 端点的响应类型 content 更改为:
content:
  text/event-stream

不要忘记更改默认响应,否则客户端将期望类型 application/json 并等待整个响应。

  • 第二点:我把ThingsService.getThings的return改成了this.getThingsFromExistingStream(你注释掉的方法)

我将更改推送到您 Github 上的新分支 fix-flux-response,因此您可以直接测试它们。