在计划任务中使用 Flux
Working with Flux in a Scheduled Task
我正在开发一个 Spring Webflux 项目,在尝试在计划任务中发布和使用 Flux 时遇到了问题。
@Scheduled(fixedRate = 20*1000)
fun updateNews() {
try {
logger.info("Automatic Update at: ${LocalDateTime.now()}")
articleRepository.saveAll(
sourceRepository.findAll().publishOn(Schedulers.parallel())
.map { source -> source.generate() }
.flatMap { it?.read() ?: Flux.empty() })
.timeout(Duration.ofSeconds(20)
).subscribeOn(Schedulers.parallel())
} catch(e: Throwable) {
logger.log(Level.SEVERE, "Error in Scheduler", e)
}
}
我配置的调度程序:
ConcurrentTaskScheduler(Executors.newScheduledThreadPool(3))
这个任务永远不会完成,除非我故意阻塞在最后:
.then().block()
我最初没有理会直接引用 publish/subscribe 调度程序,我已经尝试了所有似乎合理的选项都没有效果。
我的日志事件发生了,但似乎当来自调度程序的用于此任务的线程死亡时,通量也是垃圾;即使我指定了 publishOn 和 subscribeOn 行为后它们应该在自己的线程池中?
我想使此操作完全反应,如有任何建议,我们将不胜感激。
@Scheduled
未与 Flux
集成,因此如果您 return 它不知道如何处理 Flux
。结合 Reactor(以及一般的 Reactive Streams)这一事实,在您 subscribe()
之前通常不会发生任何事情,您可以开始看到哪里出了问题。
block()
实际上是 subscribe()
的一种形式,这就是将其添加到代码后它会起作用的原因。它实际上可能是这里最好的选择,因为您需要将一段反应式代码(来自 ReactiveRepository
)桥接到命令式阻塞世界(您的 @Scheduled fun
)。
我正在开发一个 Spring Webflux 项目,在尝试在计划任务中发布和使用 Flux 时遇到了问题。
@Scheduled(fixedRate = 20*1000)
fun updateNews() {
try {
logger.info("Automatic Update at: ${LocalDateTime.now()}")
articleRepository.saveAll(
sourceRepository.findAll().publishOn(Schedulers.parallel())
.map { source -> source.generate() }
.flatMap { it?.read() ?: Flux.empty() })
.timeout(Duration.ofSeconds(20)
).subscribeOn(Schedulers.parallel())
} catch(e: Throwable) {
logger.log(Level.SEVERE, "Error in Scheduler", e)
}
}
我配置的调度程序:
ConcurrentTaskScheduler(Executors.newScheduledThreadPool(3))
这个任务永远不会完成,除非我故意阻塞在最后:
.then().block()
我最初没有理会直接引用 publish/subscribe 调度程序,我已经尝试了所有似乎合理的选项都没有效果。
我的日志事件发生了,但似乎当来自调度程序的用于此任务的线程死亡时,通量也是垃圾;即使我指定了 publishOn 和 subscribeOn 行为后它们应该在自己的线程池中?
我想使此操作完全反应,如有任何建议,我们将不胜感激。
@Scheduled
未与 Flux
集成,因此如果您 return 它不知道如何处理 Flux
。结合 Reactor(以及一般的 Reactive Streams)这一事实,在您 subscribe()
之前通常不会发生任何事情,您可以开始看到哪里出了问题。
block()
实际上是 subscribe()
的一种形式,这就是将其添加到代码后它会起作用的原因。它实际上可能是这里最好的选择,因为您需要将一段反应式代码(来自 ReactiveRepository
)桥接到命令式阻塞世界(您的 @Scheduled fun
)。