在 Webclient Post 之前检测到空的 Flux Window

Detecting an empty Flux Window before Webclient Post

参考我之前的问题,我是用的;

myFlux
 .window(5)
 .flatMap(window -> client
  .post()
  .body(window, myClass.class)
  .exchange()
  .flatMap(response -> response.bodyToMono)
 )
 .subscribe();

这很好用。然而,在缓慢的一天,5 条消息需要一段时间才能到达,并且 windowwindow 已满之前不会发送任何内容。所以 我切换到 windowTimeout(5, Duration.ofSeconds(5)).

现在,如果没有数据并且超过 Duration,代码将传播一个空 window,这会导致发布一个空数组。

如何检测空 window 而不是 运行 post

不幸的是,如果不阅读整个 Flux 直到完成,就无法知道 Flux 将发出多少项。

由于你的window尺寸比较小,你可以使用.collectList()将Flux发出的所有物品收集到一个List中,然后检查列表是否是在发送请求之前为空。

myFlux
    .windowTimeout(5, Duration.ofSeconds(5))
    .flatMap(window ->
        // collect everything in the window into a list
        window.collectList()
             // ignore empty windows
            .filter(list -> !list.isEmpty())
             // send the request
            .flatMap(list -> client
                .post()
                .body(Flux.fromIterable(list), MyClass.class)
                .exchange()
                .flatMap(response -> response.bodyToMono(MyResponse.class))))