用反应堆发射并忘记
Fire and forget with reactor
我的 Spring 启动应用程序中有如下方法。
public Flux<Data> search(SearchRequest request) {
Flux<Data> result = searchService.search(request);//this returns Flux<Data>
Mono<List<Data>> listOfData = result.collectList();
// doThisAsync() // here I want to pass this list and run some processing on it
// the processing should happen async and the search method should return immediately.
return result;
}
//this method uses the complete List<Data> returned by above method
public void doThisAsync(List<Data> data) {
//do some processing here
}
目前,我正在使用 @Async
注释服务 class 和 doThisAsync
,但不知道如何传递 List<Data>
,因为我不知道想打电话给 block
。
我只有 Mono<List<Data>>
.
我的主要问题是如何单独处理这个 Mono,search
方法应该 return Flux<Data>
。
您是否考虑过 运行 使用 publishOn 在单独的线程中进行处理,如下例所示?
这可能不完全是您所要求的,但允许您继续处理其他事务,而通量中的结果处理由一个或多个线程完成,在我的示例中是四个线程,来自专用调度程序 (theFourThreadScheduler)。
@Test
public void processingInSeparateThreadTest() {
final Scheduler theFourThreadScheduler = Schedulers.newParallel("FourThreads", 4);
final Flux<String> theResultFlux = Flux.just("one", "two", "three", "four", "five", "six", "seven", "eight");
theResultFlux.log()
.collectList()
.publishOn(theFourThreadScheduler)
.subscribe(theStringList -> {
doThisAsync(theStringList);
});
System.out.println("Subscribed to the result flux");
for (int i = 0; i < 20; i++) {
System.out.println("Waiting for completion: " + i);
try {
Thread.sleep(300);
} catch (final InterruptedException theException) {
}
}
}
private void doThisAsync(final List<String> inStringList) {
for (final String theString : inStringList) {
System.out.println("Processing in doThisAsync: " + theString);
try {
Thread.sleep(500);
} catch (final InterruptedException theException) {
}
}
}
运行 该示例产生以下输出,表明在 doThisAsync() 中执行的处理是在后台执行的。
Subscribed to the result flux
Waiting for completion: 0
Processing in doThisAsync: one
Waiting for completion: 1
Processing in doThisAsync: two
Waiting for completion: 2
Waiting for completion: 3
Processing in doThisAsync: three
Waiting for completion: 4
Waiting for completion: 5
Processing in doThisAsync: four
Waiting for completion: 6
Processing in doThisAsync: five
Waiting for completion: 7
Waiting for completion: 8
Processing in doThisAsync: six
Waiting for completion: 9
Processing in doThisAsync: seven
Waiting for completion: 10
Waiting for completion: 11
Processing in doThisAsync: eight
Waiting for completion: 12
Waiting for completion: 13
Waiting for completion: 14
Waiting for completion: 15
Waiting for completion: 16
Waiting for completion: 17
Waiting for completion: 18
Waiting for completion: 19
1,如果你的即发即弃已经异步返回Mono
/Flux
public Flux<Data> search(SearchRequest request)
{
return searchService.search(request)
.collectList()
.doOnNext(data -> doThisAsync(data).subscribe()) // add error logging here or inside doThisAsync
.flatMapMany(Flux::fromIterable);
}
public Mono<Void> doThisAsync(List<Data> data) {
//do some async/non-blocking processing here like calling WebClient
}
2,如果您的即发即弃确实阻塞了I/O
public Flux<Data> search(SearchRequest request)
{
return searchService.search(request)
.collectList()
.doOnNext(data -> Mono.fromRunnable(() -> doThisAsync(data))
.subscribeOn(Schedulers.elastic()) // delegate to proper thread to not block main flow
.subscribe()) // add error logging here or inside doThisAsync
.flatMapMany(Flux::fromIterable);
}
public void doThisAsync(List<Data> data) {
//do some blocking I/O on calling thread
}
请注意,在上述两种情况下,您都会失去背压支持。如果 doAsyncThis
由于某种原因变慢,那么数据生产者将不会关心并继续生产项目。这是一劳永逸机制的自然结果。
我的 Spring 启动应用程序中有如下方法。
public Flux<Data> search(SearchRequest request) {
Flux<Data> result = searchService.search(request);//this returns Flux<Data>
Mono<List<Data>> listOfData = result.collectList();
// doThisAsync() // here I want to pass this list and run some processing on it
// the processing should happen async and the search method should return immediately.
return result;
}
//this method uses the complete List<Data> returned by above method
public void doThisAsync(List<Data> data) {
//do some processing here
}
目前,我正在使用 @Async
注释服务 class 和 doThisAsync
,但不知道如何传递 List<Data>
,因为我不知道想打电话给 block
。
我只有 Mono<List<Data>>
.
我的主要问题是如何单独处理这个 Mono,search
方法应该 return Flux<Data>
。
您是否考虑过 运行 使用 publishOn 在单独的线程中进行处理,如下例所示? 这可能不完全是您所要求的,但允许您继续处理其他事务,而通量中的结果处理由一个或多个线程完成,在我的示例中是四个线程,来自专用调度程序 (theFourThreadScheduler)。
@Test
public void processingInSeparateThreadTest() {
final Scheduler theFourThreadScheduler = Schedulers.newParallel("FourThreads", 4);
final Flux<String> theResultFlux = Flux.just("one", "two", "three", "four", "five", "six", "seven", "eight");
theResultFlux.log()
.collectList()
.publishOn(theFourThreadScheduler)
.subscribe(theStringList -> {
doThisAsync(theStringList);
});
System.out.println("Subscribed to the result flux");
for (int i = 0; i < 20; i++) {
System.out.println("Waiting for completion: " + i);
try {
Thread.sleep(300);
} catch (final InterruptedException theException) {
}
}
}
private void doThisAsync(final List<String> inStringList) {
for (final String theString : inStringList) {
System.out.println("Processing in doThisAsync: " + theString);
try {
Thread.sleep(500);
} catch (final InterruptedException theException) {
}
}
}
运行 该示例产生以下输出,表明在 doThisAsync() 中执行的处理是在后台执行的。
Subscribed to the result flux
Waiting for completion: 0
Processing in doThisAsync: one
Waiting for completion: 1
Processing in doThisAsync: two
Waiting for completion: 2
Waiting for completion: 3
Processing in doThisAsync: three
Waiting for completion: 4
Waiting for completion: 5
Processing in doThisAsync: four
Waiting for completion: 6
Processing in doThisAsync: five
Waiting for completion: 7
Waiting for completion: 8
Processing in doThisAsync: six
Waiting for completion: 9
Processing in doThisAsync: seven
Waiting for completion: 10
Waiting for completion: 11
Processing in doThisAsync: eight
Waiting for completion: 12
Waiting for completion: 13
Waiting for completion: 14
Waiting for completion: 15
Waiting for completion: 16
Waiting for completion: 17
Waiting for completion: 18
Waiting for completion: 19
1,如果你的即发即弃已经异步返回Mono
/Flux
public Flux<Data> search(SearchRequest request)
{
return searchService.search(request)
.collectList()
.doOnNext(data -> doThisAsync(data).subscribe()) // add error logging here or inside doThisAsync
.flatMapMany(Flux::fromIterable);
}
public Mono<Void> doThisAsync(List<Data> data) {
//do some async/non-blocking processing here like calling WebClient
}
2,如果您的即发即弃确实阻塞了I/O
public Flux<Data> search(SearchRequest request)
{
return searchService.search(request)
.collectList()
.doOnNext(data -> Mono.fromRunnable(() -> doThisAsync(data))
.subscribeOn(Schedulers.elastic()) // delegate to proper thread to not block main flow
.subscribe()) // add error logging here or inside doThisAsync
.flatMapMany(Flux::fromIterable);
}
public void doThisAsync(List<Data> data) {
//do some blocking I/O on calling thread
}
请注意,在上述两种情况下,您都会失去背压支持。如果 doAsyncThis
由于某种原因变慢,那么数据生产者将不会关心并继续生产项目。这是一劳永逸机制的自然结果。