反应管道中的后台任务(即发即弃)

Background task in reactive pipeline (Fire-and-forget)

我有一个反应管道来处理传入的请求。对于每个请求,我都需要调用业务相关函数 (doSomeRelevantProcessing)。

完成后,我需要将发生的事情通知一些外部服务。管道的那部分应该不会增加整体响应时间。 此外,通知此外部系统不是业务关键:在管道的主要部分完成后快速响应比确保通知成功更重要。

据我所知,在不减慢整个过程的情况下 运行 在后台进行某些操作的唯一方法是直接在管道中订阅,从而实现即发即弃的心态。

除了在 flatmap 内订阅之外,还有其他好的选择吗? 我有点担心,如果通知外部服务花费的时间比原来的处理时间长,并且同时有很多请求进来,会发生什么情况。这会导致内存耗尽或整个进程阻塞吗?

fun runPipeline(incoming: Mono<Request>) = incoming
    .flatMap { doSomeRelevantProcessing(it) } // this should not be delayed
    .flatMap { doBackgroundJob(it) } // this can take a moment, but is not super critical

fun doSomeRelevantProcessing(request: Request) = Mono.just(request) // do some processing

fun doBackgroundJob(request: Request) = Mono.deferContextual { ctx: ContextView ->
    val notification = "notification" // build an object from context

    // this uses non-blocking HTTP (i.e. webclient), so it can take a second or so 
    notifyExternalService(notification).subscribeOn(Schedulers.boundedElastic()).subscribe()

    Mono.just(Unit)
}

fun notifyExternalService(notification: String) = Mono.just(Unit) // might take a while

我假设您使用纯反应机制通知外部服务来回答这个问题——即您没有包装阻塞服务。如果您是,那么答案会有所不同,因为您受限于有界弹性线程池的大小,如果您第二次收到数百个请求,它可能很快就会不堪重负。

(假设您使用的是反应机制,那么就没有必要像您在示例中给出的那样 .subscribeOn(Schedulers.boundedElastic()),因为这不会给您带来任何好处 - 它是为包装遗留阻塞服务而设计的。)

Could this lead to a memory exhaustion

只有在非常极端的情况下才有可能,每个单独的请求使用的内存会很小。这几乎肯定不值得担心,如果您在这里开始看到内存问题,那么您几乎肯定会在其他地方遇到其他问题。

话虽这么说,我可能会建议在您的内部订阅方法之前添加 .timeout(Duration.ofSeconds(5)) 或类似内容,以确保请求在一段时间后因任何原因未工作时被终止 - 这将防止它们堆积。

...or [can this cause] the overall process to block?

这个更简单 - 一个简短的不,它不能。