如何将 Mono 变成真正的异步(非响应式!)方法调用?
How to turn a Mono into a truly asynchronous (not reactive!) method call?
我有办法
@Service
public class MyService {
public Mono<Integer> processData() {
... // very long reactive operation
}
}
在正常的程序流程中,我通过Kafka事件异步调用这个方法。
出于测试目的,我需要将该方法作为 Web 服务公开,但该方法应作为异步公开:return仅使用 HTTP 代码 200 OK ( “请求已接受”)并继续在后台处理数据。
仅从控制器方法调用 Mono#subscribe()
和 return 是否可以(= 它没有任何不良副作用)?
@RestController
@RequiredArgsConstructor
public class MyController {
private final MyService service;
@GetMapping
public void processData() {
service.processData()
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
}
}
还是这样比较好(这里我被IntelliJ的警告弄糊涂了,可能和https://youtrack.jetbrains.com/issue/IDEA-276018一样?):
public Mono<Void> processData() {
service.processData()
.subscribeOn(Schedulers.boundedElastic())
.subscribe(); // IntelliJ complains "Inappropriate 'subscribe' call" but I think it's a false alarm in my case(?)
return Mono.empty();
}
或者其他解决方案?
你用下面的代码选项其实没问题:
@GetMapping
public void processData() {
service.processData()
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
}
这实际上是您在 @Scheduled
方法中所做的,该方法只是 returns 什么都没有,您明确订阅 Mono
或 Flux
以便发出元素。
Is it OK (= doesn't it have any unwanted side effects) just to call Mono#subscribe() and return from the controller method?
有副作用,但您可以接受它们:
- 这真的是一劳永逸 - 这意味着虽然您永远不会收到成功的通知(大多数人意识到这一点),但您也永远不会收到失败的通知(很少有人意识到这一点。)
- 如果进程由于某种原因挂起,发布者将永远不会完成,而您将无从知晓。由于您订阅的是有界弹性线程池,因此它也会无限期地占用其中一个有限线程。
第一点你可能会接受,或者你可能想把一些错误记录作为副作用以某种方式进一步记录在反应链中,这样如果出现问题你至少会有一个内部通知。
对于第二点 - 我建议在您的方法调用上设置一个(慷慨的)超时,这样如果它没有在设定的时间内完成,它至少会被取消,并且不再浪费资源。如果您是 运行 一个异步任务,那么这不是一个大问题,因为它只会消耗一点内存。如果您在弹性调度程序上包装一个阻塞调用,那么情况会更糟,因为您会无限期地占用该线程池中的一个线程。
我还质疑为什么您需要在这里完全使用有界弹性调度程序 - 它用于包装阻塞调用,这似乎不是此用例的基础。 (要明确的是,如果您的服务正在阻塞,那么您绝对应该将其包装在弹性调度程序上——但如果没有,则没有理由这样做。)
最后,这个例子:
public Mono<Void> processData() {
service.processData()
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
return Mono.empty();
}
...是 不 做的一个很好的例子,因为你正在创建一种“冒名顶替的反应方法” - 有人可能会非常合理地订阅 returned 发布者认为它会在底层发布者完成时完成,这显然不是这里发生的事情。在这种情况下,使用 void
return 类型而不是 return 是正确的做法。
我有办法
@Service
public class MyService {
public Mono<Integer> processData() {
... // very long reactive operation
}
}
在正常的程序流程中,我通过Kafka事件异步调用这个方法。
出于测试目的,我需要将该方法作为 Web 服务公开,但该方法应作为异步公开:return仅使用 HTTP 代码 200 OK ( “请求已接受”)并继续在后台处理数据。
仅从控制器方法调用 Mono#subscribe()
和 return 是否可以(= 它没有任何不良副作用)?
@RestController
@RequiredArgsConstructor
public class MyController {
private final MyService service;
@GetMapping
public void processData() {
service.processData()
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
}
}
还是这样比较好(这里我被IntelliJ的警告弄糊涂了,可能和https://youtrack.jetbrains.com/issue/IDEA-276018一样?):
public Mono<Void> processData() {
service.processData()
.subscribeOn(Schedulers.boundedElastic())
.subscribe(); // IntelliJ complains "Inappropriate 'subscribe' call" but I think it's a false alarm in my case(?)
return Mono.empty();
}
或者其他解决方案?
你用下面的代码选项其实没问题:
@GetMapping
public void processData() {
service.processData()
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
}
这实际上是您在 @Scheduled
方法中所做的,该方法只是 returns 什么都没有,您明确订阅 Mono
或 Flux
以便发出元素。
Is it OK (= doesn't it have any unwanted side effects) just to call Mono#subscribe() and return from the controller method?
有副作用,但您可以接受它们:
- 这真的是一劳永逸 - 这意味着虽然您永远不会收到成功的通知(大多数人意识到这一点),但您也永远不会收到失败的通知(很少有人意识到这一点。)
- 如果进程由于某种原因挂起,发布者将永远不会完成,而您将无从知晓。由于您订阅的是有界弹性线程池,因此它也会无限期地占用其中一个有限线程。
第一点你可能会接受,或者你可能想把一些错误记录作为副作用以某种方式进一步记录在反应链中,这样如果出现问题你至少会有一个内部通知。
对于第二点 - 我建议在您的方法调用上设置一个(慷慨的)超时,这样如果它没有在设定的时间内完成,它至少会被取消,并且不再浪费资源。如果您是 运行 一个异步任务,那么这不是一个大问题,因为它只会消耗一点内存。如果您在弹性调度程序上包装一个阻塞调用,那么情况会更糟,因为您会无限期地占用该线程池中的一个线程。
我还质疑为什么您需要在这里完全使用有界弹性调度程序 - 它用于包装阻塞调用,这似乎不是此用例的基础。 (要明确的是,如果您的服务正在阻塞,那么您绝对应该将其包装在弹性调度程序上——但如果没有,则没有理由这样做。)
最后,这个例子:
public Mono<Void> processData() {
service.processData()
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
return Mono.empty();
}
...是 不 做的一个很好的例子,因为你正在创建一种“冒名顶替的反应方法” - 有人可能会非常合理地订阅 returned 发布者认为它会在底层发布者完成时完成,这显然不是这里发生的事情。在这种情况下,使用 void
return 类型而不是 return 是正确的做法。