为什么 Sinks.many().multicast().onBackpressureBuffer() 在订阅者之一取消订阅后完成以及如何避免它
Why is Sinks.many().multicast().onBackpressureBuffer() completing after one of the subscribers cancels the subscription and how to avoid it
我在使用 Sinks.Many<String>
向多个订阅者通知某些事件时遇到了我不理解的行为:
fun main() {
val sink : Sinks.Many<String> = Sinks.many().multicast().onBackpressureBuffer()
val flux = sink.asFlux().log()
val d = flux.subscribe {
println("--> $it")
}
sink.emitNext("1", Sinks.EmitFailureHandler.FAIL_FAST)
val d2 = flux.subscribe {
println("--2> $it")
}
sink.emitNext("2", Sinks.EmitFailureHandler.FAIL_FAST)
}
此代码显示第一个订阅者获得值 1 和 2,第二个订阅者获得值 2。到目前为止一切顺利:
11:49:06.936 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:49:06.938 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(1)
--> 1
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:49:06.943 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(2)
--> 2
11:49:06.943 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(2)
--2> 2
现在,假设第一个订阅者在第一次发射后处理(取消)其订阅,我期望第一个订阅者获得 1,第二个订阅者获得 2:
val sink : Sinks.Many<String> = Sinks.many().multicast().onBackpressureBuffer()
val flux = sink.asFlux().log()
val d = flux.subscribe {
println("--> $it")
}
sink.emitNext("1", Sinks.EmitFailureHandler.FAIL_FAST)
d.dispose()
val d2 = flux.subscribe {
println("--2> $it")
}
sink.emitNext("2", Sinks.EmitFailureHandler.FAIL_FAST)
}
11:51:48.684 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:51:48.685 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(1)
--> 1
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - cancel()
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:51:48.690 [main] INFO reactor.Flux.EmitterProcessor.1 - onComplete()
但是,当第二个订阅者尝试订阅时,流量被视为已完成。为什么会这样?我需要 Sinks.Many 随时可用,以便在不取消的情况下订阅和取消订阅。
我刚遇到同样的问题。
这是autoCancel默认为true造成的。不幸的是 onBackpressureBuffer javadoc 没有提到它。
此行为继承自 EmitterProcessor.create 的记录。
要将 autoCancel 标志设置为 false,必须使用替代方法 onBackpressureBuffer
Many<String> sink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
我在使用 Sinks.Many<String>
向多个订阅者通知某些事件时遇到了我不理解的行为:
fun main() {
val sink : Sinks.Many<String> = Sinks.many().multicast().onBackpressureBuffer()
val flux = sink.asFlux().log()
val d = flux.subscribe {
println("--> $it")
}
sink.emitNext("1", Sinks.EmitFailureHandler.FAIL_FAST)
val d2 = flux.subscribe {
println("--2> $it")
}
sink.emitNext("2", Sinks.EmitFailureHandler.FAIL_FAST)
}
此代码显示第一个订阅者获得值 1 和 2,第二个订阅者获得值 2。到目前为止一切顺利:
11:49:06.936 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:49:06.938 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(1)
--> 1
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:49:06.943 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(2)
--> 2
11:49:06.943 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(2)
--2> 2
现在,假设第一个订阅者在第一次发射后处理(取消)其订阅,我期望第一个订阅者获得 1,第二个订阅者获得 2:
val sink : Sinks.Many<String> = Sinks.many().multicast().onBackpressureBuffer()
val flux = sink.asFlux().log()
val d = flux.subscribe {
println("--> $it")
}
sink.emitNext("1", Sinks.EmitFailureHandler.FAIL_FAST)
d.dispose()
val d2 = flux.subscribe {
println("--2> $it")
}
sink.emitNext("2", Sinks.EmitFailureHandler.FAIL_FAST)
}
11:51:48.684 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:51:48.685 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(1)
--> 1
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - cancel()
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:51:48.690 [main] INFO reactor.Flux.EmitterProcessor.1 - onComplete()
但是,当第二个订阅者尝试订阅时,流量被视为已完成。为什么会这样?我需要 Sinks.Many 随时可用,以便在不取消的情况下订阅和取消订阅。
我刚遇到同样的问题。
这是autoCancel默认为true造成的。不幸的是 onBackpressureBuffer javadoc 没有提到它。
此行为继承自 EmitterProcessor.create 的记录。
要将 autoCancel 标志设置为 false,必须使用替代方法 onBackpressureBuffer
Many<String> sink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);