Flux.subscribe(Consumer<? super T> consumer>) 和 Flux.doOnNext(Consumer<? super T> onNext) 之间的区别
Difference between Flux.subscribe(Consumer<? super T> consumer>) and Flux.doOnNext(Consumer<? super T> onNext)
刚开始了解 Reactor 的反应式编程,我从这里的教程中看到了这个代码片段 building-a-chat-application-with-angular-and-spring-reactive-websocket
class ChatSocketHandler(val mapper: ObjectMapper) : WebSocketHandler {
val sink = Sinks.replay<Message>(100);
val outputMessages: Flux<Message> = sink.asFlux();
override fun handle(session: WebSocketSession): Mono<Void> {
println("handling WebSocketSession...")
session.receive()
.map { it.payloadAsText }
.map { Message(id= UUID.randomUUID().toString(), body = it, sentAt = Instant.now()) }
.doOnNext { println(it) }
.subscribe(
{ message: Message -> sink.next(message) },
{ error: Throwable -> sink.error(error) }
);
return session.send(
Mono.delay(Duration.ofMillis(100))
.thenMany(outputMessages.map { session.textMessage(toJson(it)) })
)
}
fun toJson(message: Message): String = mapper.writeValueAsString(message)
}
我明白它的作用,但不明白为什么作者在订阅方法中使用消费者而不是链接另一个 doOnNext(消费者)。 IE。行:
.doOnNext { println(it) }
.subscribe(
{ message: Message -> sink.next(message) },
{ error: Throwable -> sink.error(error) }
从 Reactor 文档中我读到 Flux.subscribe(Consumer super T> consumer):
Subscribe a Consumer to this Flux that will consume all the elements in the sequence. It will request an unbounded demand (Long.MAX_VALUE).
For a passive version that observe and forward incoming data see doOnNext(java.util.function.Consumer).
然而,我不明白为什么人们会选择一个而不是另一个,对我来说,它们在功能上似乎是相同的。
区别更多的是传统而非功能 - 区别在于副作用与最终消费者。
doOnXXX
系列方法用于在反应链执行时针对用户设计的副作用 - 日志记录是其中最常见的,但您可能还需要指标、分析等每个元素通过时的视图。所有这些的关键在于,将其中任何一个作为最终消费者(例如上面示例中的 println()
。)没有多大意义。
相反,subscribe()
消费者是“最终消费者”,通常由您的框架(例如Webflux)而不是用户代码调用-所以这种情况有点该规则的例外。在这种情况下,他们主动将此反应链中的消息传递到另一个接收器以进行进一步处理 - 因此将其作为“副作用”样式方法没有多大意义,因为您不希望 Flux继续超越这一点。
(附录:如上所述,reactor / Webflux 的正常方法是让 Webflux 处理订阅,这不是这里发生的事情。我没有详细查看是否有更明智的方法在没有用户订阅的情况下实现这一点,但根据我的经验,usually 是,手动调用订阅是 usually 有点代码味道作为结果。您当然应该尽可能避免在自己的代码中使用它。)
刚开始了解 Reactor 的反应式编程,我从这里的教程中看到了这个代码片段 building-a-chat-application-with-angular-and-spring-reactive-websocket
class ChatSocketHandler(val mapper: ObjectMapper) : WebSocketHandler {
val sink = Sinks.replay<Message>(100);
val outputMessages: Flux<Message> = sink.asFlux();
override fun handle(session: WebSocketSession): Mono<Void> {
println("handling WebSocketSession...")
session.receive()
.map { it.payloadAsText }
.map { Message(id= UUID.randomUUID().toString(), body = it, sentAt = Instant.now()) }
.doOnNext { println(it) }
.subscribe(
{ message: Message -> sink.next(message) },
{ error: Throwable -> sink.error(error) }
);
return session.send(
Mono.delay(Duration.ofMillis(100))
.thenMany(outputMessages.map { session.textMessage(toJson(it)) })
)
}
fun toJson(message: Message): String = mapper.writeValueAsString(message)
}
我明白它的作用,但不明白为什么作者在订阅方法中使用消费者而不是链接另一个 doOnNext(消费者)。 IE。行:
.doOnNext { println(it) }
.subscribe(
{ message: Message -> sink.next(message) },
{ error: Throwable -> sink.error(error) }
从 Reactor 文档中我读到 Flux.subscribe(Consumer super T> consumer):
Subscribe a Consumer to this Flux that will consume all the elements in the sequence. It will request an unbounded demand (Long.MAX_VALUE).
For a passive version that observe and forward incoming data see doOnNext(java.util.function.Consumer).
然而,我不明白为什么人们会选择一个而不是另一个,对我来说,它们在功能上似乎是相同的。
区别更多的是传统而非功能 - 区别在于副作用与最终消费者。
doOnXXX
系列方法用于在反应链执行时针对用户设计的副作用 - 日志记录是其中最常见的,但您可能还需要指标、分析等每个元素通过时的视图。所有这些的关键在于,将其中任何一个作为最终消费者(例如上面示例中的 println()
。)没有多大意义。
相反,subscribe()
消费者是“最终消费者”,通常由您的框架(例如Webflux)而不是用户代码调用-所以这种情况有点该规则的例外。在这种情况下,他们主动将此反应链中的消息传递到另一个接收器以进行进一步处理 - 因此将其作为“副作用”样式方法没有多大意义,因为您不希望 Flux继续超越这一点。
(附录:如上所述,reactor / Webflux 的正常方法是让 Webflux 处理订阅,这不是这里发生的事情。我没有详细查看是否有更明智的方法在没有用户订阅的情况下实现这一点,但根据我的经验,usually 是,手动调用订阅是 usually 有点代码味道作为结果。您当然应该尽可能避免在自己的代码中使用它。)