使用 Flux 而不是 for 循环,有什么好处吗?

Using a Flux instead of a for loop, any benefits?

我正在努力了解响应式编程,所以我想问一下在这里使用 Flux 是否有任何好处:

override fun notifyObserversOnMessage(message: Message) {
        Flux.fromStream(observers.stream())
                .map { observer -> Mono.just(observer.reactOnMessage(message)) }
                .subscribe()        
    }

而不是:

override fun notifyObserversOnMessage(message: Message) {
        for (observer in observers) {
            observer.reactOnMessage(message)
        }       
    }

这是否取决于每个观察者所做的工作,是否是 IO?

视情况而定。

如果按顺序处理observers是合理的,这里使用Flux并没有什么好处

override fun notifyObserversOnMessage(message: Message) {
    observers.forEach { observer ->
        observer.reactOnMessage(message)
    }
}

或者干脆

override fun notifyObserversOnMessage(message: Message) {
    observers.forEach { it.reactOnMessage(message) }
}

没问题。

对于简单的并行性,

override fun notifyObserversOnMessage(message: Message) {
    observers.parallelStream().forEach { it.reactOnMessage(message) }
}

可以使用,但此时您更有可能有额外的要求,例如工作池或超时。在这种情况下,Reactor 的表现力就很有用了。