使用 Project Reactor 管理对共享资源的访问
Manage access to shared resource with Project Reactor
如何使用 Project Reactor 管理对共享资源的访问?
给定一个虚构的关键组件,它当时只能执行操作(文件存储、昂贵的远程服务等),如果有多个访问该组件的点,如何以反应方式编排对该组件的访问(多个 API 方法、订阅者...)?如果资源有空执行操作,它应该立即执行,如果其他操作已经在进行中,将我的操作添加到队列并在我的操作完成后完成我的 Mono。
我的想法是将任务添加到逐个执行任务的 flux 队列和 return 一个 Mono,一旦队列中的任务完成,它就会完成,不会阻塞。
class CriticalResource {
private final Sinks.Many<Mono<?>> taskExecutor = Sinks.many()
.unicast()
.onBackpressureBuffer();
private final Disposable taskExecutorDisposable = taskExecutor.asFlux()
.concatMap(Function.identity()) //this executes actions in sequential order
.subscribe();
public Mono<Void> resourceOperation1() {
doSomething();
.as(this::sequential);
}
public Mono<Void> resourceOperation2() {
doSomethingElse();
.as(this::sequential);
}
public Mono<Void> resourceOperation3() {
doSomething();
.then(somethingElse())
.as(this::sequential);
}
private <T> Mono<T> sequential(Mono<T> action) {
return Mono.defer(() -> {
Sinks.One<T> actionResult = Sinks.one(); //create a new mono which should complete when our action mono completes.
//Since task executor subscribes to action mono, we are subscribing on action result mono
while (taskExecutor.tryEmitNext(action.doOnError(t -> actionResult.emitError(t,
Sinks.EmitFailureHandler.FAIL_FAST))
.doOnSuccess(next -> actionResult.emitValue(next,
Sinks.EmitFailureHandler.FAIL_FAST)))
!= Sinks.EmitResult.OK) {
}
return actionResult.asMono();
});
}
}
这不是一个完整的示例,因为需要做更多的工作来正确传播背压、传输上下文等...但我想知道是否有更好的方法来实现这一点,并得到 Project Reactor 的支持?
这看起来像是 reactor-pool 本质上的简化版本。您是否考虑过将其与例如一起使用?最大尺寸为 1?
https://github.com/reactor/reactor-pool/
https://projectreactor.io/docs/pool/0.2.7/api/reactor/pool/Pool.html
这个池可能有点矫枉过正,因为它的开销是必须在多个 竞争借款人 之上处理多个 资源 ,例如在你的情况下,但也许它可以为你走得更远提供一些灵感。
如何使用 Project Reactor 管理对共享资源的访问?
给定一个虚构的关键组件,它当时只能执行操作(文件存储、昂贵的远程服务等),如果有多个访问该组件的点,如何以反应方式编排对该组件的访问(多个 API 方法、订阅者...)?如果资源有空执行操作,它应该立即执行,如果其他操作已经在进行中,将我的操作添加到队列并在我的操作完成后完成我的 Mono。
我的想法是将任务添加到逐个执行任务的 flux 队列和 return 一个 Mono,一旦队列中的任务完成,它就会完成,不会阻塞。
class CriticalResource {
private final Sinks.Many<Mono<?>> taskExecutor = Sinks.many()
.unicast()
.onBackpressureBuffer();
private final Disposable taskExecutorDisposable = taskExecutor.asFlux()
.concatMap(Function.identity()) //this executes actions in sequential order
.subscribe();
public Mono<Void> resourceOperation1() {
doSomething();
.as(this::sequential);
}
public Mono<Void> resourceOperation2() {
doSomethingElse();
.as(this::sequential);
}
public Mono<Void> resourceOperation3() {
doSomething();
.then(somethingElse())
.as(this::sequential);
}
private <T> Mono<T> sequential(Mono<T> action) {
return Mono.defer(() -> {
Sinks.One<T> actionResult = Sinks.one(); //create a new mono which should complete when our action mono completes.
//Since task executor subscribes to action mono, we are subscribing on action result mono
while (taskExecutor.tryEmitNext(action.doOnError(t -> actionResult.emitError(t,
Sinks.EmitFailureHandler.FAIL_FAST))
.doOnSuccess(next -> actionResult.emitValue(next,
Sinks.EmitFailureHandler.FAIL_FAST)))
!= Sinks.EmitResult.OK) {
}
return actionResult.asMono();
});
}
}
这不是一个完整的示例,因为需要做更多的工作来正确传播背压、传输上下文等...但我想知道是否有更好的方法来实现这一点,并得到 Project Reactor 的支持?
这看起来像是 reactor-pool 本质上的简化版本。您是否考虑过将其与例如一起使用?最大尺寸为 1?
https://github.com/reactor/reactor-pool/
https://projectreactor.io/docs/pool/0.2.7/api/reactor/pool/Pool.html
这个池可能有点矫枉过正,因为它的开销是必须在多个 竞争借款人 之上处理多个 资源 ,例如在你的情况下,但也许它可以为你走得更远提供一些灵感。