项目反应器:collectList() 之后的 block() 不适用于 Flux.create()

Project reactor: block() after collectList() doesn't work for Flux.create()

当我在下面的示例中对 collectList() 返回的 Mono<> 实例调用 block() 时,我的代码挂起,尽管在发射器上调用了 complete()。

我知道在大多数情况下不建议调用 block()。我正在编写测试代码,我对它的使用似乎是合理的——除了它不起作用之外。

下面的代码是 .

的略微修改版本
    ConnectableFlux<Integer> connect = Flux.<Integer>create(emitter -> {
        Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
          .forEach(t -> emitter.next(t));
        emitter.complete();
    }).publish(); // EDIT <- use .replay() 

    connect.subscribe(v -> System.out.println("1: " + v));

    Mono<List<Integer>> mono = connect
            .filter(number -> number > 6)
            .collectList();

    mono.subscribe(v -> System.out.println("4: " + v));

    connect.connect();
    List<Integer> results = mono.block(); //hangs here

编辑:如上面的评论所示,使用 replay() 而不是 publish() 允许 block() 像我最初预期的那样解析。

当您调用 block() 时,您订阅了您的 MonoConnectableFlux,但您没有连接到您的通量源。上一个 connect.connect() 不影响此订阅。

你的最后一行可能是这样的:

List<Integer> results = connect
        .autoConnect()
        .filter(number -> number > 6)
        .collectList()
        .block();