反应管道中的后台任务(即发即弃)
Background task in reactive pipeline (Fire-and-forget)
我有一个反应管道来处理传入的请求。对于每个请求,我都需要调用业务相关函数 (doSomeRelevantProcessing
)。
完成后,我需要将发生的事情通知一些外部服务。管道的那部分应该不会增加整体响应时间。
此外,通知此外部系统不是业务关键:在管道的主要部分完成后快速响应比确保通知成功更重要。
据我所知,在不减慢整个过程的情况下 运行 在后台进行某些操作的唯一方法是直接在管道中订阅,从而实现即发即弃的心态。
除了在 flatmap
内订阅之外,还有其他好的选择吗?
我有点担心,如果通知外部服务花费的时间比原来的处理时间长,并且同时有很多请求进来,会发生什么情况。这会导致内存耗尽或整个进程阻塞吗?
fun runPipeline(incoming: Mono<Request>) = incoming
.flatMap { doSomeRelevantProcessing(it) } // this should not be delayed
.flatMap { doBackgroundJob(it) } // this can take a moment, but is not super critical
fun doSomeRelevantProcessing(request: Request) = Mono.just(request) // do some processing
fun doBackgroundJob(request: Request) = Mono.deferContextual { ctx: ContextView ->
val notification = "notification" // build an object from context
// this uses non-blocking HTTP (i.e. webclient), so it can take a second or so
notifyExternalService(notification).subscribeOn(Schedulers.boundedElastic()).subscribe()
Mono.just(Unit)
}
fun notifyExternalService(notification: String) = Mono.just(Unit) // might take a while
我假设您使用纯反应机制通知外部服务来回答这个问题——即您没有包装阻塞服务。如果您是,那么答案会有所不同,因为您受限于有界弹性线程池的大小,如果您第二次收到数百个请求,它可能很快就会不堪重负。
(假设您使用的是反应机制,那么就没有必要像您在示例中给出的那样 .subscribeOn(Schedulers.boundedElastic())
,因为这不会给您带来任何好处 - 它是为包装遗留阻塞服务而设计的。)
Could this lead to a memory exhaustion
只有在非常极端的情况下才有可能,每个单独的请求使用的内存会很小。这几乎肯定不值得担心,如果您在这里开始看到内存问题,那么您几乎肯定会在其他地方遇到其他问题。
话虽这么说,我可能会建议在您的内部订阅方法之前添加 .timeout(Duration.ofSeconds(5))
或类似内容,以确保请求在一段时间后因任何原因未工作时被终止 - 这将防止它们堆积。
...or [can this cause] the overall process to block?
这个更简单 - 一个简短的不,它不能。
我有一个反应管道来处理传入的请求。对于每个请求,我都需要调用业务相关函数 (doSomeRelevantProcessing
)。
完成后,我需要将发生的事情通知一些外部服务。管道的那部分应该不会增加整体响应时间。 此外,通知此外部系统不是业务关键:在管道的主要部分完成后快速响应比确保通知成功更重要。
据我所知,在不减慢整个过程的情况下 运行 在后台进行某些操作的唯一方法是直接在管道中订阅,从而实现即发即弃的心态。
除了在 flatmap
内订阅之外,还有其他好的选择吗?
我有点担心,如果通知外部服务花费的时间比原来的处理时间长,并且同时有很多请求进来,会发生什么情况。这会导致内存耗尽或整个进程阻塞吗?
fun runPipeline(incoming: Mono<Request>) = incoming
.flatMap { doSomeRelevantProcessing(it) } // this should not be delayed
.flatMap { doBackgroundJob(it) } // this can take a moment, but is not super critical
fun doSomeRelevantProcessing(request: Request) = Mono.just(request) // do some processing
fun doBackgroundJob(request: Request) = Mono.deferContextual { ctx: ContextView ->
val notification = "notification" // build an object from context
// this uses non-blocking HTTP (i.e. webclient), so it can take a second or so
notifyExternalService(notification).subscribeOn(Schedulers.boundedElastic()).subscribe()
Mono.just(Unit)
}
fun notifyExternalService(notification: String) = Mono.just(Unit) // might take a while
我假设您使用纯反应机制通知外部服务来回答这个问题——即您没有包装阻塞服务。如果您是,那么答案会有所不同,因为您受限于有界弹性线程池的大小,如果您第二次收到数百个请求,它可能很快就会不堪重负。
(假设您使用的是反应机制,那么就没有必要像您在示例中给出的那样 .subscribeOn(Schedulers.boundedElastic())
,因为这不会给您带来任何好处 - 它是为包装遗留阻塞服务而设计的。)
Could this lead to a memory exhaustion
只有在非常极端的情况下才有可能,每个单独的请求使用的内存会很小。这几乎肯定不值得担心,如果您在这里开始看到内存问题,那么您几乎肯定会在其他地方遇到其他问题。
话虽这么说,我可能会建议在您的内部订阅方法之前添加 .timeout(Duration.ofSeconds(5))
或类似内容,以确保请求在一段时间后因任何原因未工作时被终止 - 这将防止它们堆积。
...or [can this cause] the overall process to block?
这个更简单 - 一个简短的不,它不能。