Spring Flux 和 Async 注释
Spring Flux and the Async annotation
我有一个 Spring Flux 应用程序,有时我需要在后台执行一些繁重的任务,调用者(HTTP 请求)不需要等到该任务完成。
如果没有反应器,我可能只会使用 Async 注释,在不同的线程上执行该方法。
对于 reactor,我不确定我是否应该继续使用该方法,或者是否已经有一个内置机制可以让我完成此操作。
例如,给定一个接受 Resource 对象的 Controller:
@PostMapping("/create")
public Mono<Resource> create(@Valid @RequestBody Resource r) {
processor.run(r); // the caller should not wait for the resource to be processed
return repository.save(r);
}
还有一个处理器 class:
@Async
void run(Resource r) {
WebClient webClient = WebClient.create("http://localhost:8080");
Mono<String> result = webClient.get()
.retrieve()
.bodyToMono(String.class);
String response = result.block(); //block for now
}
/create
的 HTTP 调用者不需要等到 run
方法完成。
我做了一些测试,我认为即使使用 subscribe()
作为即发即忘也会等待请求完成,然后再将答案返回给网络浏览器或 REST 客户端(至少在我的简单测试中,它看起来像那样)。因此,您必须执行与@Async 类似的操作,创建另一个线程:
@PostMapping("/create")
public Mono<Resource> create(@Valid @RequestBody Resource r) {
return processor.run(r)
.subscribeOn(Schedulers.elastic()) // put eveything above this line on another thread
.doOnNext(string -> repository.save(r)); // persist "r", not changing it, though
}
还有一个处理器 class:
Mono<String> run(Resource r) {
WebClient webClient = WebClient.create("http://localhost:8080");
return webClient.get()
.retrieve()
.bodyToMono(String.class);
}
如果您正在寻找 即发即弃 模式实施,您可以订阅您的发布者
@PostMapping("/create")
public Mono<Resource> create(@Valid @RequestBody Resource r) {
run(r).subscribe();
return repository.save(r);
}
Mono<Void> run(Resource r) {
WebClient webClient = WebClient.create("http://localhost:8080");
return webClient.get()
.retrieve()
.bodyToMono(String.class)
.then();
}
如果您的发布者执行阻塞操作,则应使用弹性或并行调度程序在其他线程上订阅它。
我有一个 Spring Flux 应用程序,有时我需要在后台执行一些繁重的任务,调用者(HTTP 请求)不需要等到该任务完成。
如果没有反应器,我可能只会使用 Async 注释,在不同的线程上执行该方法。 对于 reactor,我不确定我是否应该继续使用该方法,或者是否已经有一个内置机制可以让我完成此操作。
例如,给定一个接受 Resource 对象的 Controller:
@PostMapping("/create")
public Mono<Resource> create(@Valid @RequestBody Resource r) {
processor.run(r); // the caller should not wait for the resource to be processed
return repository.save(r);
}
还有一个处理器 class:
@Async
void run(Resource r) {
WebClient webClient = WebClient.create("http://localhost:8080");
Mono<String> result = webClient.get()
.retrieve()
.bodyToMono(String.class);
String response = result.block(); //block for now
}
/create
的 HTTP 调用者不需要等到 run
方法完成。
我做了一些测试,我认为即使使用 subscribe()
作为即发即忘也会等待请求完成,然后再将答案返回给网络浏览器或 REST 客户端(至少在我的简单测试中,它看起来像那样)。因此,您必须执行与@Async 类似的操作,创建另一个线程:
@PostMapping("/create")
public Mono<Resource> create(@Valid @RequestBody Resource r) {
return processor.run(r)
.subscribeOn(Schedulers.elastic()) // put eveything above this line on another thread
.doOnNext(string -> repository.save(r)); // persist "r", not changing it, though
}
还有一个处理器 class:
Mono<String> run(Resource r) {
WebClient webClient = WebClient.create("http://localhost:8080");
return webClient.get()
.retrieve()
.bodyToMono(String.class);
}
如果您正在寻找 即发即弃 模式实施,您可以订阅您的发布者
@PostMapping("/create")
public Mono<Resource> create(@Valid @RequestBody Resource r) {
run(r).subscribe();
return repository.save(r);
}
Mono<Void> run(Resource r) {
WebClient webClient = WebClient.create("http://localhost:8080");
return webClient.get()
.retrieve()
.bodyToMono(String.class)
.then();
}
如果您的发布者执行阻塞操作,则应使用弹性或并行调度程序在其他线程上订阅它。