项目反应器:collectList() 之后的 block() 不适用于 Flux.create()
Project reactor: block() after collectList() doesn't work for Flux.create()
当我在下面的示例中对 collectList() 返回的 Mono<> 实例调用 block() 时,我的代码挂起,尽管在发射器上调用了 complete()。
我知道在大多数情况下不建议调用 block()。我正在编写测试代码,我对它的使用似乎是合理的——除了它不起作用之外。
下面的代码是 .
的略微修改版本
ConnectableFlux<Integer> connect = Flux.<Integer>create(emitter -> {
Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.forEach(t -> emitter.next(t));
emitter.complete();
}).publish(); // EDIT <- use .replay()
connect.subscribe(v -> System.out.println("1: " + v));
Mono<List<Integer>> mono = connect
.filter(number -> number > 6)
.collectList();
mono.subscribe(v -> System.out.println("4: " + v));
connect.connect();
List<Integer> results = mono.block(); //hangs here
编辑:如上面的评论所示,使用 replay() 而不是 publish() 允许 block() 像我最初预期的那样解析。
当您调用 block()
时,您订阅了您的 Mono
和 ConnectableFlux
,但您没有连接到您的通量源。上一个 connect.connect()
不影响此订阅。
你的最后一行可能是这样的:
List<Integer> results = connect
.autoConnect()
.filter(number -> number > 6)
.collectList()
.block();
当我在下面的示例中对 collectList() 返回的 Mono<> 实例调用 block() 时,我的代码挂起,尽管在发射器上调用了 complete()。
我知道在大多数情况下不建议调用 block()。我正在编写测试代码,我对它的使用似乎是合理的——除了它不起作用之外。
下面的代码是
ConnectableFlux<Integer> connect = Flux.<Integer>create(emitter -> {
Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.forEach(t -> emitter.next(t));
emitter.complete();
}).publish(); // EDIT <- use .replay()
connect.subscribe(v -> System.out.println("1: " + v));
Mono<List<Integer>> mono = connect
.filter(number -> number > 6)
.collectList();
mono.subscribe(v -> System.out.println("4: " + v));
connect.connect();
List<Integer> results = mono.block(); //hangs here
编辑:如上面的评论所示,使用 replay() 而不是 publish() 允许 block() 像我最初预期的那样解析。
当您调用 block()
时,您订阅了您的 Mono
和 ConnectableFlux
,但您没有连接到您的通量源。上一个 connect.connect()
不影响此订阅。
你的最后一行可能是这样的:
List<Integer> results = connect
.autoConnect()
.filter(number -> number > 6)
.collectList()
.block();