订阅可连接热源时出现CancellationException
CancellationException occurs while subscribing to connectable hot source
我正在使用 reactor-core 3.1.4。
考虑以下代码片段:
Flux<String> flux = Flux.<String>create(sink -> sink.next("test"))
.replay(1)
.refCount();
flux.subscribe(System.out::println);
flux.next().subscribe(System.out::println); // The exception is thrown here!
预期输出:
test
test
实际输出:
test
Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disconnected
Caused by: java.util.concurrent.CancellationException: Disconnected
at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1202)
at reactor.core.publisher.OperatorDisposables.dispose(OperatorDisposables.java:132)
at reactor.core.publisher.FluxRefCount$RefCountMonitor.innerCancelled(FluxRefCount.java:132)
at reactor.core.publisher.FluxRefCount$RefCountInner.cancel(FluxRefCount.java:200)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:75)
at reactor.core.publisher.FluxRefCount$RefCountInner.onNext(FluxRefCount.java:177)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:808)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:892)
at reactor.core.publisher.FluxReplay.subscribe(FluxReplay.java:1085)
at reactor.core.publisher.FluxRefCount$RefCountMonitor.subscribe(FluxRefCount.java:116)
at reactor.core.publisher.FluxRefCount.subscribe(FluxRefCount.java:77)
at reactor.core.publisher.MonoNext.subscribe(MonoNext.java:40)
at reactor.core.publisher.Mono.subscribe(Mono.java:3077)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:3185)
at reactor.core.publisher.Mono.subscribe(Mono.java:3071)
at reactor.core.publisher.Mono.subscribe(Mono.java:3038)
at reactor.core.publisher.Mono.subscribe(Mono.java:2985)
at test.Test.main(Test.java:10)
对我来说,这是 reactor-core 库中的一个错误。我的说法正确还是我遗漏(误解)了什么?
谢谢,
斯特凡
我正在使用 reactor-core 3.1.4。
考虑以下代码片段:
Flux<String> flux = Flux.<String>create(sink -> sink.next("test"))
.replay(1)
.refCount();
flux.subscribe(System.out::println);
flux.next().subscribe(System.out::println); // The exception is thrown here!
预期输出:
test
test
实际输出:
test
Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disconnected
Caused by: java.util.concurrent.CancellationException: Disconnected
at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1202)
at reactor.core.publisher.OperatorDisposables.dispose(OperatorDisposables.java:132)
at reactor.core.publisher.FluxRefCount$RefCountMonitor.innerCancelled(FluxRefCount.java:132)
at reactor.core.publisher.FluxRefCount$RefCountInner.cancel(FluxRefCount.java:200)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:75)
at reactor.core.publisher.FluxRefCount$RefCountInner.onNext(FluxRefCount.java:177)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:808)
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:892)
at reactor.core.publisher.FluxReplay.subscribe(FluxReplay.java:1085)
at reactor.core.publisher.FluxRefCount$RefCountMonitor.subscribe(FluxRefCount.java:116)
at reactor.core.publisher.FluxRefCount.subscribe(FluxRefCount.java:77)
at reactor.core.publisher.MonoNext.subscribe(MonoNext.java:40)
at reactor.core.publisher.Mono.subscribe(Mono.java:3077)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:3185)
at reactor.core.publisher.Mono.subscribe(Mono.java:3071)
at reactor.core.publisher.Mono.subscribe(Mono.java:3038)
at reactor.core.publisher.Mono.subscribe(Mono.java:2985)
at test.Test.main(Test.java:10)
对我来说,这是 reactor-core 库中的一个错误。我的说法正确还是我遗漏(误解)了什么?
谢谢, 斯特凡