不了解如何在 kotlin 中使用 flux 订阅

Dont understand how to make flux subscription working in kotlin

我是响应式编程的新手。我希望看到

test provider started
Beat 1000
Beat 2000

在日志中,但只有 test provider started 而没有 Beaton complete 消息。好像漏掉了什么

@Service
class ProviderService {

    @PostConstruct
    fun start(){
        val hb: Flux<HeartBeat> = Flux.interval(Duration.ofSeconds(1)).map { HeartBeat(it) }
        val provider = Provider("test", hb)
    }

}
////////////////////////

open class Provider(name: String, heartBests: Flux<HeartBeat>) {
    companion object {
        val log = LoggerFactory.getLogger(Provider::class.java)!!
    }

    init {
        log.info("$name provider started")
        heartBests.doOnComplete { log.info("on complete") }
        heartBests.doOnEach { onBeat(it.get().number) }
    }

    fun onBeat(n: Number){
        log.info("Beat $n")
    }
}

/////
class HeartBeat(val number: Number)

在您的代码中,来自 'doOnComplete' 的 lambda 从未被调用,因为您创建了无限流。方法 'doOnEach' 因为 'map' 是中间操作(如流中的 map),它不会调用。 你还有另一个错误,reactive suggests "fluent pattern"。

试试这个简单的例子:

import reactor.core.publisher.Flux
import java.time.Duration

fun main(args: Array<String>) {
    val flux = Flux.interval(Duration.ofSeconds(1)).map { HeartBeat(it) }

    println("start")

    flux.take(3)
            .doOnEach { println("on each $it") }
            .map { println("before map");HeartBeat(it.value * 2) }
            .doOnNext { println("on next $it") }
            .doOnComplete { println("on complete") }
            .subscribe { println("subscribe $it") }

    Thread.sleep(5000)
}

data class HeartBeat(val value: Long)

这里有三个非常常见的错误:

  • 运算符喜欢 doOnEach return 一个新的 Flux 具有添加行为的实例,因此您需要(重新)分配给变量或使用流畅的样式
  • 直到你 subscribe()(或者它的变体。blockXXXsubscribe 在引擎盖下做......)
  • 这样的管道是完全异步的,并且由于源 interval 的时间维度而在单独的 Thread 上运行。因此,即使您已订阅,控件也会立即在 init 中 return,这可能会导致主线程和应用程序退出。