在 Webclient Post 之前检测到空的 Flux Window
Detecting an empty Flux Window before Webclient Post
参考我之前的问题,我是用的;
myFlux
.window(5)
.flatMap(window -> client
.post()
.body(window, myClass.class)
.exchange()
.flatMap(response -> response.bodyToMono)
)
.subscribe();
这很好用。然而,在缓慢的一天,5 条消息需要一段时间才能到达,并且 window
在 window
已满之前不会发送任何内容。所以
我切换到 windowTimeout(5, Duration.ofSeconds(5))
.
现在,如果没有数据并且超过 Duration
,代码将传播一个空 window
,这会导致发布一个空数组。
如何检测空 window
而不是 运行 post
?
不幸的是,如果不阅读整个 Flux 直到完成,就无法知道 Flux 将发出多少项。
由于你的window尺寸比较小,你可以使用.collectList()
将Flux发出的所有物品收集到一个List
中,然后检查列表是否是在发送请求之前为空。
myFlux
.windowTimeout(5, Duration.ofSeconds(5))
.flatMap(window ->
// collect everything in the window into a list
window.collectList()
// ignore empty windows
.filter(list -> !list.isEmpty())
// send the request
.flatMap(list -> client
.post()
.body(Flux.fromIterable(list), MyClass.class)
.exchange()
.flatMap(response -> response.bodyToMono(MyResponse.class))))
参考我之前的问题
myFlux
.window(5)
.flatMap(window -> client
.post()
.body(window, myClass.class)
.exchange()
.flatMap(response -> response.bodyToMono)
)
.subscribe();
这很好用。然而,在缓慢的一天,5 条消息需要一段时间才能到达,并且 window
在 window
已满之前不会发送任何内容。所以
我切换到 windowTimeout(5, Duration.ofSeconds(5))
.
现在,如果没有数据并且超过 Duration
,代码将传播一个空 window
,这会导致发布一个空数组。
如何检测空 window
而不是 运行 post
?
不幸的是,如果不阅读整个 Flux 直到完成,就无法知道 Flux 将发出多少项。
由于你的window尺寸比较小,你可以使用.collectList()
将Flux发出的所有物品收集到一个List
中,然后检查列表是否是在发送请求之前为空。
myFlux
.windowTimeout(5, Duration.ofSeconds(5))
.flatMap(window ->
// collect everything in the window into a list
window.collectList()
// ignore empty windows
.filter(list -> !list.isEmpty())
// send the request
.flatMap(list -> client
.post()
.body(Flux.fromIterable(list), MyClass.class)
.exchange()
.flatMap(response -> response.bodyToMono(MyResponse.class))))