不了解如何在 kotlin 中使用 flux 订阅
Dont understand how to make flux subscription working in kotlin
我是响应式编程的新手。我希望看到
test provider started
Beat 1000
Beat 2000
在日志中,但只有 test provider started
而没有 Beat
或 on complete
消息。好像漏掉了什么
@Service
class ProviderService {
@PostConstruct
fun start(){
val hb: Flux<HeartBeat> = Flux.interval(Duration.ofSeconds(1)).map { HeartBeat(it) }
val provider = Provider("test", hb)
}
}
////////////////////////
open class Provider(name: String, heartBests: Flux<HeartBeat>) {
companion object {
val log = LoggerFactory.getLogger(Provider::class.java)!!
}
init {
log.info("$name provider started")
heartBests.doOnComplete { log.info("on complete") }
heartBests.doOnEach { onBeat(it.get().number) }
}
fun onBeat(n: Number){
log.info("Beat $n")
}
}
/////
class HeartBeat(val number: Number)
在您的代码中,来自 'doOnComplete' 的 lambda 从未被调用,因为您创建了无限流。方法 'doOnEach' 因为 'map' 是中间操作(如流中的 map
),它不会调用。
你还有另一个错误,reactive suggests "fluent pattern"。
试试这个简单的例子:
import reactor.core.publisher.Flux
import java.time.Duration
fun main(args: Array<String>) {
val flux = Flux.interval(Duration.ofSeconds(1)).map { HeartBeat(it) }
println("start")
flux.take(3)
.doOnEach { println("on each $it") }
.map { println("before map");HeartBeat(it.value * 2) }
.doOnNext { println("on next $it") }
.doOnComplete { println("on complete") }
.subscribe { println("subscribe $it") }
Thread.sleep(5000)
}
data class HeartBeat(val value: Long)
这里有三个非常常见的错误:
- 运算符喜欢
doOnEach
return 一个新的 Flux
具有添加行为的实例,因此您需要(重新)分配给变量或使用流畅的样式
- 直到你
subscribe()
(或者它的变体。blockXXX
也 subscribe
在引擎盖下做......)
- 这样的管道是完全异步的,并且由于源
interval
的时间维度而在单独的 Thread
上运行。因此,即使您已订阅,控件也会立即在 init
中 return,这可能会导致主线程和应用程序退出。
我是响应式编程的新手。我希望看到
test provider started
Beat 1000
Beat 2000
在日志中,但只有 test provider started
而没有 Beat
或 on complete
消息。好像漏掉了什么
@Service
class ProviderService {
@PostConstruct
fun start(){
val hb: Flux<HeartBeat> = Flux.interval(Duration.ofSeconds(1)).map { HeartBeat(it) }
val provider = Provider("test", hb)
}
}
////////////////////////
open class Provider(name: String, heartBests: Flux<HeartBeat>) {
companion object {
val log = LoggerFactory.getLogger(Provider::class.java)!!
}
init {
log.info("$name provider started")
heartBests.doOnComplete { log.info("on complete") }
heartBests.doOnEach { onBeat(it.get().number) }
}
fun onBeat(n: Number){
log.info("Beat $n")
}
}
/////
class HeartBeat(val number: Number)
在您的代码中,来自 'doOnComplete' 的 lambda 从未被调用,因为您创建了无限流。方法 'doOnEach' 因为 'map' 是中间操作(如流中的 map
),它不会调用。
你还有另一个错误,reactive suggests "fluent pattern"。
试试这个简单的例子:
import reactor.core.publisher.Flux
import java.time.Duration
fun main(args: Array<String>) {
val flux = Flux.interval(Duration.ofSeconds(1)).map { HeartBeat(it) }
println("start")
flux.take(3)
.doOnEach { println("on each $it") }
.map { println("before map");HeartBeat(it.value * 2) }
.doOnNext { println("on next $it") }
.doOnComplete { println("on complete") }
.subscribe { println("subscribe $it") }
Thread.sleep(5000)
}
data class HeartBeat(val value: Long)
这里有三个非常常见的错误:
- 运算符喜欢
doOnEach
return 一个新的Flux
具有添加行为的实例,因此您需要(重新)分配给变量或使用流畅的样式 - 直到你
subscribe()
(或者它的变体。blockXXX
也subscribe
在引擎盖下做......) - 这样的管道是完全异步的,并且由于源
interval
的时间维度而在单独的Thread
上运行。因此,即使您已订阅,控件也会立即在init
中 return,这可能会导致主线程和应用程序退出。