从 WebClient 流式传输到 Flux。阻塞超时抛出异常

Stream from WebClient into Flux. Blocking timeout throws exception

我正在 Spring 引导应用程序中使用 WebClient 调用流 API。

我想检索元素,直到我收到 10 个元素,或者 10 秒过去了。我希望请求被阻止,直到其中一个先发生。

        WebClient client = WebClient.builder().baseUrl(URL).build();

        List<Item> items = client
                .get()
                .retrieve()
                .bodyToFlux(Item.class)
                .limitRequest(10)
                .collectList()
                .block(Duration.ofSeconds(10));

如果在超时前检索到 10 个项目,调用很好 returns 并且我有一个包含 10 个项目的填充列表。

但是,如果超时先过去,则会抛出以下异常,并且不会return编辑任何项目。

java.lang.IllegalStateException: Timeout on blocking read for 10000 MILLISECONDS

如何使用 WebClient 读取长达 x 秒的流,然后 return 检索到的项目?

I'd like to retrieve elements until I either received 10 elements, or 10 seconds have elapsed.

听起来 bufferTimeout() 正是您所追求的。

Collect incoming values into multiple List buffers that will be emitted by the returned Flux each time the buffer reaches a maximum size OR the maxTime Duration elapses.

您只需要这些缓冲区之一。在反应式上下文中,您只需调用 next() 生成的通量 - 因为您只想阻塞,您可以调用 blockFirst().

类似于:

List<Item> items = client
        .get()
        .retrieve()
        .bodyToFlux(Item.class)
        .bufferTimeout(10, Duration.ofSeconds(10)) //first parameter is max number of elements, second is timeout
        .blockFirst();