Java 反应堆 - 链 Mono<Void> 与另一个异步任务生成 Mono<Object>
Java reactor - chain Mono<Void> with another async task producing Mono<Object>
我有以下异步任务:
public class AsyncValidationTask {
// Returns Mono.error(new Exception()) if error, otherwise Mono.empty()
public Mono<Void> execute(Object o);
}
public class AsyncSaveTask {
// Returns Mono.error(new Exception()) if error, otherwise Mono of Object
public Mono<Object> execute(Object o);
}
及以下服务 class:
public class AsyncService {
private AsyncValidationTask validation;
private AsyncSaveTask save;
public Mono<Object> validateAndSave(Object o) {
return Mono.defer(() -> this.validation.execute(o))
// Right now, the problem is that when validation completes successfully, it
// emits Mono.empty hence the flatMap chained below will not be invoked at all.
.flatMap(dontcare -> this.save.execute(o))
}
}
如上所示,如果 AsyncValidationTask.execute
成功完成,我尝试使用 flatMap
链接 AsyncSaveTask.execute
调用,它不会工作,因为没有发出任何内容(Mono.empty) 完成后。
我还考虑 then
链接第二个调用,但它总是会调用链接的调用,而不管第一个验证调用产生的 Mono.error。
如何正确链接它们?
.then
仅供终端源链接
使用.then
,以便链接您的执行与进程,它只发送一个终端信号。
另外,注意,如果你需要对错误信号做些什么,那么你必须事先伴随着你的.then
和onErrorResume
。
public class AsyncService {
private AsyncValidationTask validation;
private AsyncSaveTask save;
public Mono<Object> validateAndSave(Object o) {
return Mono.defer(() -> this.validation.execute(o))
.onErrorResume(t -> ...) // should be before then
.then(this.save.execute(o))
}
}
.defer
为了推迟单声道创作
为了仅在验证成功的情况下执行 this.save.execute(o)
,您还必须将其包装在 Mono.defer
中:
public class AsyncService {
private AsyncValidationTask validation;
private AsyncSaveTask save;
public Mono<Object> validateAndSave(Object o) {
return Mono.defer(() -> this.validation.execute(o))
.onErrorResume(t -> ...) // should be before then
.then(Mono.defer(() -> this.save.execute(o)))
}
}
通常不需要,因为Mono
是一个 LAZY 类型 SHOULD start 仅在 情况下工作订阅 发生了(订阅== .subscribe()
)。
Mono#then
的实现保证 this.save.execute
方法返回的 Mono
订阅开始 在 Mono.defer(() -> this.validation.execute(o))
完成 之后。
执行可能提前开始的唯一原因可能是目的(例如,有意提供此类行为的业务逻辑 - 缓存;热源等)OR INCORRECT 实现 this.save.execute(o)
无论实际订阅如何,它都会开始工作。
正确设计您的实现
一般来说,确保 API 确实有效并将其公开为 Publisher
(例如 Mono
| Flux
)是一个很好的做法懒惰的。
这意味着 API 创建者必须确保只有在用户订阅了给定的 Publisher
实例的情况下才会执行工作。
例如,如果您的异步 API 在下面进行 CompletableFuture
创建,则值得手动将您的 CompletableFuture
创建包装到 Mono.defer
中或使用适当的方法扩展,例如 Mono.fromFuture(Supplier<? extends CompletableFuture<? extends T>> futureSupplier)
执行器示例
让我们考虑如何使常规 ThreadPool 任务提交 Reactive。
interface Executor {
Future<T> execute(Callable<T> runnable);
}
因此,为了使 Executor
具有响应性,我们必须创建如下内容:
interface ReactiveExecutor {
Mono<T> execute(Callable<T> runnable);
}
实施不正确
以下是此类适配器的可能实现:
class ReactiveExecutorAdapter {
final Executor delegate;
...
Mono<T> execute(Callable<T> runnable) {
MonoProcessor<T> result = MonoProcessor.create();
Future<T> task = delegate.execute(() -> {
T value = runnable.call();
result.onNext(value);
result.onComplet();
return value;
});
return result.doOnCancel(() -> task.cancel());
}
}
当然,这样的实现会起作用。但是,它有几个关键问题:
- 执行从方法调用开始(这与反应流的惰性行为有些矛盾
Publisher
)
- 由于执行在实际任务订阅之前开始,我们必须创建一个有状态的
Mono
,它支持稍后的订阅。
- 此实现不处理根本没有订阅者的情况(例如,执行已经开始,但没有发生
.subscribe
方法(然后我们得到无法处理的值泄漏)
- 总的来说太hacky了,无法成为解决方案。此外,为了防止前面提到的所有情况,有必要在实现之外用
Mono.defer
包装 Mono execute(..)
上的每个调用(请参阅问题中的原始问题)。随后,它导致 API 用户很容易 'shoot your self in the foot' 忘记用额外的 .defer
包装执行
那么,怎么解决呢?
基本上,将 Mono.defer
移动到库内部就足够了。这将使 API 用户的生活更轻松,因为他们不必考虑何时需要使用延迟(因此 - 可能出现的问题更少)。
例如,我们的 Reactive Executor 最简单的解决方案可以是:
class ReactiveExecutorAdapter {
final Executor delegate;
...
Mono<T> execute(Callable<T> runnable) {
Mono.defer(() -> {
MonoProcessor<T> result = MonoProcessor.create();
Future<T> task = delegate.execute(() -> {
T value = runnable.call();
result.onNext(value);
result.onComplet();
return value;
});
return result.doOnCancel(() -> task.cancel());
})
}
}
通过延迟执行,我们至少可以解决一个问题——确保值不再泄露。
但是,如何正确解决呢?
但是,为了解决该特定情况下所有可能出现的问题,我们可以使用 Mono.create
,它是为采用异步而设计的 API:
class ReactiveExecutorAdapter {
final Executor delegate;
...
Mono<T> execute(Callable<T> runnable) {
Mono.crete(monoSink -> {
Future<T> task = delegate.execute(() -> {
T value = runnable.call();
monoSink.complete(value);
});
monoSink.doOnCancel(task::cancel);
})
}
}
使用 Mono.create
我们可以保证对每个订阅者进行惰性执行。
此外,使用 MonoSink
API,我们可以快速挂接来自订阅者的所有基本信号。
最后,Mono.create 确保在发生任何情况时,该值将被适当丢弃。
最后,有了这样的 API 就没有必要对所有情况都使用 defer
不确定我是否正确理解了这个问题,但是..看看 OP 问题中方法的签名,对于 ExecutorService
,它们看起来并不像 "async tasks",它们看起来像仅仅是 Mono
生产方法,在 'reactive' 世界中经常遇到的东西。那么问题就是将它们与
之类的东西结合起来
Mono<Object> validateAndSave(Object o) {
return validation.execute(o)
.then(save.execute(o));
.then
将忽略源发出的元素(即 validation.execute
),但不会忽略错误信号。所以在 onError
的情况下,你的方法将 return Mono.error()
,否则它将 return what save.execute
returned.
我有以下异步任务:
public class AsyncValidationTask {
// Returns Mono.error(new Exception()) if error, otherwise Mono.empty()
public Mono<Void> execute(Object o);
}
public class AsyncSaveTask {
// Returns Mono.error(new Exception()) if error, otherwise Mono of Object
public Mono<Object> execute(Object o);
}
及以下服务 class:
public class AsyncService {
private AsyncValidationTask validation;
private AsyncSaveTask save;
public Mono<Object> validateAndSave(Object o) {
return Mono.defer(() -> this.validation.execute(o))
// Right now, the problem is that when validation completes successfully, it
// emits Mono.empty hence the flatMap chained below will not be invoked at all.
.flatMap(dontcare -> this.save.execute(o))
}
}
如上所示,如果 AsyncValidationTask.execute
成功完成,我尝试使用 flatMap
链接 AsyncSaveTask.execute
调用,它不会工作,因为没有发出任何内容(Mono.empty) 完成后。
我还考虑 then
链接第二个调用,但它总是会调用链接的调用,而不管第一个验证调用产生的 Mono.error。
如何正确链接它们?
.then
仅供终端源链接
使用.then
,以便链接您的执行与进程,它只发送一个终端信号。
另外,注意,如果你需要对错误信号做些什么,那么你必须事先伴随着你的.then
和onErrorResume
。
public class AsyncService {
private AsyncValidationTask validation;
private AsyncSaveTask save;
public Mono<Object> validateAndSave(Object o) {
return Mono.defer(() -> this.validation.execute(o))
.onErrorResume(t -> ...) // should be before then
.then(this.save.execute(o))
}
}
.defer
为了推迟单声道创作
为了仅在验证成功的情况下执行 this.save.execute(o)
,您还必须将其包装在 Mono.defer
中:
public class AsyncService {
private AsyncValidationTask validation;
private AsyncSaveTask save;
public Mono<Object> validateAndSave(Object o) {
return Mono.defer(() -> this.validation.execute(o))
.onErrorResume(t -> ...) // should be before then
.then(Mono.defer(() -> this.save.execute(o)))
}
}
通常不需要,因为Mono
是一个 LAZY 类型 SHOULD start 仅在 情况下工作订阅 发生了(订阅== .subscribe()
)。
Mono#then
的实现保证 this.save.execute
方法返回的 Mono
订阅开始 在 Mono.defer(() -> this.validation.execute(o))
完成 之后。
执行可能提前开始的唯一原因可能是目的(例如,有意提供此类行为的业务逻辑 - 缓存;热源等)OR INCORRECT 实现 this.save.execute(o)
无论实际订阅如何,它都会开始工作。
正确设计您的实现
一般来说,确保 API 确实有效并将其公开为 Publisher
(例如 Mono
| Flux
)是一个很好的做法懒惰的。
这意味着 API 创建者必须确保只有在用户订阅了给定的 Publisher
实例的情况下才会执行工作。
例如,如果您的异步 API 在下面进行 CompletableFuture
创建,则值得手动将您的 CompletableFuture
创建包装到 Mono.defer
中或使用适当的方法扩展,例如 Mono.fromFuture(Supplier<? extends CompletableFuture<? extends T>> futureSupplier)
执行器示例
让我们考虑如何使常规 ThreadPool 任务提交 Reactive。
interface Executor {
Future<T> execute(Callable<T> runnable);
}
因此,为了使 Executor
具有响应性,我们必须创建如下内容:
interface ReactiveExecutor {
Mono<T> execute(Callable<T> runnable);
}
实施不正确
以下是此类适配器的可能实现:
class ReactiveExecutorAdapter {
final Executor delegate;
...
Mono<T> execute(Callable<T> runnable) {
MonoProcessor<T> result = MonoProcessor.create();
Future<T> task = delegate.execute(() -> {
T value = runnable.call();
result.onNext(value);
result.onComplet();
return value;
});
return result.doOnCancel(() -> task.cancel());
}
}
当然,这样的实现会起作用。但是,它有几个关键问题:
- 执行从方法调用开始(这与反应流的惰性行为有些矛盾
Publisher
) - 由于执行在实际任务订阅之前开始,我们必须创建一个有状态的
Mono
,它支持稍后的订阅。 - 此实现不处理根本没有订阅者的情况(例如,执行已经开始,但没有发生
.subscribe
方法(然后我们得到无法处理的值泄漏) - 总的来说太hacky了,无法成为解决方案。此外,为了防止前面提到的所有情况,有必要在实现之外用
Mono.defer
包装Mono execute(..)
上的每个调用(请参阅问题中的原始问题)。随后,它导致 API 用户很容易 'shoot your self in the foot' 忘记用额外的.defer
包装执行
那么,怎么解决呢?
基本上,将 Mono.defer
移动到库内部就足够了。这将使 API 用户的生活更轻松,因为他们不必考虑何时需要使用延迟(因此 - 可能出现的问题更少)。
例如,我们的 Reactive Executor 最简单的解决方案可以是:
class ReactiveExecutorAdapter {
final Executor delegate;
...
Mono<T> execute(Callable<T> runnable) {
Mono.defer(() -> {
MonoProcessor<T> result = MonoProcessor.create();
Future<T> task = delegate.execute(() -> {
T value = runnable.call();
result.onNext(value);
result.onComplet();
return value;
});
return result.doOnCancel(() -> task.cancel());
})
}
}
通过延迟执行,我们至少可以解决一个问题——确保值不再泄露。
但是,如何正确解决呢?
但是,为了解决该特定情况下所有可能出现的问题,我们可以使用 Mono.create
,它是为采用异步而设计的 API:
class ReactiveExecutorAdapter {
final Executor delegate;
...
Mono<T> execute(Callable<T> runnable) {
Mono.crete(monoSink -> {
Future<T> task = delegate.execute(() -> {
T value = runnable.call();
monoSink.complete(value);
});
monoSink.doOnCancel(task::cancel);
})
}
}
使用 Mono.create
我们可以保证对每个订阅者进行惰性执行。
此外,使用 MonoSink
API,我们可以快速挂接来自订阅者的所有基本信号。
最后,Mono.create 确保在发生任何情况时,该值将被适当丢弃。
最后,有了这样的 API 就没有必要对所有情况都使用 defer
不确定我是否正确理解了这个问题,但是..看看 OP 问题中方法的签名,对于 ExecutorService
,它们看起来并不像 "async tasks",它们看起来像仅仅是 Mono
生产方法,在 'reactive' 世界中经常遇到的东西。那么问题就是将它们与
Mono<Object> validateAndSave(Object o) {
return validation.execute(o)
.then(save.execute(o));
.then
将忽略源发出的元素(即 validation.execute
),但不会忽略错误信号。所以在 onError
的情况下,你的方法将 return Mono.error()
,否则它将 return what save.execute
returned.