项目反应器:collectList() 不适用于 Flux.create()

Project reactor: collectList() doesn't work for Flux.create()

下面的示例打印从 1 到 10 的整数和 (7, 8, 9, 10) 的列表

public void streamCollect() {

    ConnectableFlux<Integer> connect = Flux.range(1, 10)
            .publish();

    connect.subscribe(v -> System.out.println("1: " + v));

    connect
            .filter(number -> number > 6)
            .collectList()
            .subscribe(v -> System.out.println("4: " + v));

    connect.connect();
}

结果:

1:1

1:2

1:3

1:4

1:5

1:6

1:7

1:8

1:9

1:10

4: [7, 8, 9, 10]

下一个示例应该产生相同的结果,但只打印出从 1 到 10 的数字而不是列表。为什么?

public void streamCollect() {

    ConnectableFlux<Integer> connect = Flux.<Integer>create(emitter -> {

        Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                .forEach(t -> emitter.next(t));
    }).publish();

    connect.subscribe(v -> System.out.println("1: " + v));

    connect
            .filter(number -> number > 6)
            .collectList()
            .subscribe(v -> System.out.println("4: " + v));

    connect.connect();
}

结果:

1:1

1:2

1:3

1:4

1:5

1:6

1:7

1:8

1:9

1:10

collectList 等待 onComplete 信号,您在 create lambda 中从未产生过该信号