将 Streaming Flux 的 WebClient Post 拆分为 JSON 数组

Splitting a WebClient Post of a Streaming Flux into JSON Arrays

我正在使用第三方 REST 控制器,它接受 JSON 对象数组和 returns 单个对象响应。当我从使用有限 Flux 的 WebClient POST 时,代码有效(我假设,因为 Flux 完成)。

但是,当Flux可能是无限的,我怎么办;

  1. POST 在数组块中?
  2. 根据 POSTed 数组捕获响应?
  3. 停止传输 Flux?

这是我的豆子;

public class Car implements Serializable {

    Long id;

    public Car() {}
    public Car(Long id) { this.id = id; }
    public Long getId() {return id; }
    public void setId(Long id) { this.id = id; }
}

这是我假设的第三方客户端的样子;

@RestController
public class ThirdPartyServer {

    @PostMapping("/cars")
    public CarResponse doCars(@RequestBody List<Car> cars) {
        System.err.println("Got " + cars);
        return new CarResponse("OK");
    }
}

这是我的代码。当我 POST flux2 时,完成后发送一个 JSON 数组。但是,当我 POST flux1 时,第一个 take(5) 之后什么也没有发送。 POST 接下来的 5 个块如何?

@Component
public class MyCarClient {

    public void sendCars() {

//      Flux<Car> flux1 = Flux.interval(Duration.ofMillis(250)).map(i -> new Car(i));
        Flux<Car> flux2 = Flux.range(1, 10).map(i -> new Car((long) i));

        WebClient client = WebClient.create("http://localhost:8080");
        client
            .post()
            .uri("/cars")
            .contentType(MediaType.APPLICATION_JSON)
            .body(flux2, Car.class) 
//          .body(flux1.take(5).collectList(), new ParameterizedTypeReference<List<Car>>() {})
            .exchange()
            .subscribe(r -> System.err.println(r.statusCode()));
    }
}
  1. How do I POST in chunks of arrays?

使用 Flux.window 的一种变体将主通量拆分为窗口通量,然后使用窗口通量通过 .flatMap

发送请求
        Flux<Car> flux1 = Flux.interval(Duration.ofMillis(250)).map(i -> new Car(i));

        WebClient client = WebClient.create("http://localhost:8080");
        Disposable disposable = flux1
                // 1
                .window(5)
                .flatMap(windowedFlux -> client
                        .post()
                        .uri("/cars")
                        .contentType(MediaType.APPLICATION_JSON)
                        .body(windowedFlux, Car.class)
                        .exchange()
                        // 2
                        .doOnNext(response -> System.out.println(response.statusCode()))
                        .flatMap(response -> response.bodyToMono(...)))
                .subscribe();

        Thread.sleep(10000);

        // 3
        disposable.dispose();

  1. How do I capture the response, per POSTed array?

您可以在 .exchange() 之后通过运算符分析响应。

在我提供的示例中,可以在 doOnNext 运算符中看到响应,但您可以使用任何对 onNext 信号进行操作的运算符,例如 maphandle.

务必完整阅读响应正文以确保连接return返回池(请参阅note)。在这里,我使用了 .bodyToMono,但是任何 .body.toEntity 方法都可以。

  1. Stop the transmission of the Flux?

像您一样使用 subscribe 方法时,您可以使用 returned disposable.dispose().

停止流程

或者,您可以 return 来自 sendCars() 方法的 Flux 并将订阅和处理委托给调用者。

请注意,在我提供的示例中,我只是使用 Thread.sleep() 来模拟等待。在实际应用中,你应该使用更高级的东西,避免Thread.sleep()