项目反应器:collectList() 不适用于 Flux.create()
Project reactor: collectList() doesn't work for Flux.create()
下面的示例打印从 1 到 10 的整数和 (7, 8, 9, 10) 的列表
public void streamCollect() {
ConnectableFlux<Integer> connect = Flux.range(1, 10)
.publish();
connect.subscribe(v -> System.out.println("1: " + v));
connect
.filter(number -> number > 6)
.collectList()
.subscribe(v -> System.out.println("4: " + v));
connect.connect();
}
结果:
1:1
1:2
1:3
1:4
1:5
1:6
1:7
1:8
1:9
1:10
4: [7, 8, 9, 10]
下一个示例应该产生相同的结果,但只打印出从 1 到 10 的数字而不是列表。为什么?
public void streamCollect() {
ConnectableFlux<Integer> connect = Flux.<Integer>create(emitter -> {
Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.forEach(t -> emitter.next(t));
}).publish();
connect.subscribe(v -> System.out.println("1: " + v));
connect
.filter(number -> number > 6)
.collectList()
.subscribe(v -> System.out.println("4: " + v));
connect.connect();
}
结果:
1:1
1:2
1:3
1:4
1:5
1:6
1:7
1:8
1:9
1:10
collectList 等待 onComplete 信号,您在 create lambda 中从未产生过该信号
下面的示例打印从 1 到 10 的整数和 (7, 8, 9, 10) 的列表
public void streamCollect() {
ConnectableFlux<Integer> connect = Flux.range(1, 10)
.publish();
connect.subscribe(v -> System.out.println("1: " + v));
connect
.filter(number -> number > 6)
.collectList()
.subscribe(v -> System.out.println("4: " + v));
connect.connect();
}
结果:
1:1
1:2
1:3
1:4
1:5
1:6
1:7
1:8
1:9
1:10
4: [7, 8, 9, 10]
下一个示例应该产生相同的结果,但只打印出从 1 到 10 的数字而不是列表。为什么?
public void streamCollect() {
ConnectableFlux<Integer> connect = Flux.<Integer>create(emitter -> {
Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.forEach(t -> emitter.next(t));
}).publish();
connect.subscribe(v -> System.out.println("1: " + v));
connect
.filter(number -> number > 6)
.collectList()
.subscribe(v -> System.out.println("4: " + v));
connect.connect();
}
结果:
1:1
1:2
1:3
1:4
1:5
1:6
1:7
1:8
1:9
1:10
collectList 等待 onComplete 信号,您在 create lambda 中从未产生过该信号