在计划任务中使用 Flux

Working with Flux in a Scheduled Task

我正在开发一个 Spring Webflux 项目,在尝试在计划任务中发布和使用 Flux 时遇到了问题。

@Scheduled(fixedRate = 20*1000)
fun updateNews() {
    try {
        logger.info("Automatic Update at: ${LocalDateTime.now()}")
        articleRepository.saveAll(
                sourceRepository.findAll().publishOn(Schedulers.parallel())
                        .map { source -> source.generate() }
                        .flatMap { it?.read() ?: Flux.empty() })
                        .timeout(Duration.ofSeconds(20)
        ).subscribeOn(Schedulers.parallel())
    } catch(e: Throwable) {
        logger.log(Level.SEVERE, "Error in Scheduler", e)
    }
}

我配置的调度程序:

ConcurrentTaskScheduler(Executors.newScheduledThreadPool(3))

这个任务永远不会完成,除非我故意阻塞在最后:

.then().block()

我最初没有理会直接引用 publish/subscribe 调度程序,我已经尝试了所有似乎合理的选项都没有效果。

我的日志事件发生了,但似乎当来自调度程序的用于此任务的线程死亡时,通量也是垃圾;即使我指定了 publishOn 和 subscribeOn 行为后它们应该在自己的线程池中?

我想使此操作完全反应,如有任何建议,我们将不胜感激。

@Scheduled 未与 Flux 集成,因此如果您 return 它不知道如何处理 Flux。结合 Reactor(以及一般的 Reactive Streams)这一事实,在您 subscribe() 之前通常不会发生任何事情,您可以开始看到哪里出了问题。

block() 实际上是 subscribe() 的一种形式,这就是将其添加到代码后它会起作用的原因。它实际上可能是这里最好的选择,因为您需要将一段反应式代码(来自 ReactiveRepository)桥接到命令式阻塞世界(您的 @Scheduled fun)。