Project Reactor - 并行执行
Project Reactor - Parallel Execution
我有下面的助焊剂,
@Test
public void fluxWithRange_CustomTest() {
Flux<Integer> intFlux = Flux.range(1, 10).flatMap(i -> {
if (i % 2 == 0) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return Mono.just(i);
} else {
return Mono.just(i);
}
}, 2).subscribeOn(Schedulers.newBoundedElastic(2, 2, "test")).log();
StepVerifier.create(intFlux).expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).verifyComplete();
}
我原以为它会 运行 并行执行,但是,它只在 1 个线程中执行。
当“某人”订阅您的 Flux 时,subscribeOn 方法仅提供一种将执行转移到不同线程的方法。这意味着当您使用 StepVerifier 时,您正在订阅通量,并且因为您定义了一个调度程序,所以执行被移动到调度程序提供的线程之一。这并不意味着 Flux 将在多个线程之间跳转。
您期望的行为可以通过添加第二个 subscribeOn 来归档,但是添加到您在 flatMap 中使用的 Mono。当 flatMap 现在订阅内容时,它将使用另一个线程。
如果您将代码更改为如下内容:
@Test
public void fluxWithRange_CustomTest() throws InterruptedException {
Flux<Integer> intFlux = Flux.range(1, 10)
.flatMap(i -> subFlux(i),2)
.subscribeOn(Schedulers.newBoundedElastic(2, 2, "test")).log();
StepVerifier.create(intFlux).expectNext(1, 2, 3, 4, 5, 6, 7, 8,9,10).verifyComplete(); //This now fails.
}
private Mono<Integer> subFlux(int i) {
Mono<Integer> result = Mono.create(sink ->
{
if (i % 2 == 0) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
sink.success(i);
});
return result.subscribeOn(Schedulers.newBoundedElastic(2, 2, "other"));
}
我有下面的助焊剂,
@Test
public void fluxWithRange_CustomTest() {
Flux<Integer> intFlux = Flux.range(1, 10).flatMap(i -> {
if (i % 2 == 0) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return Mono.just(i);
} else {
return Mono.just(i);
}
}, 2).subscribeOn(Schedulers.newBoundedElastic(2, 2, "test")).log();
StepVerifier.create(intFlux).expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).verifyComplete();
}
我原以为它会 运行 并行执行,但是,它只在 1 个线程中执行。
当“某人”订阅您的 Flux 时,subscribeOn 方法仅提供一种将执行转移到不同线程的方法。这意味着当您使用 StepVerifier 时,您正在订阅通量,并且因为您定义了一个调度程序,所以执行被移动到调度程序提供的线程之一。这并不意味着 Flux 将在多个线程之间跳转。
您期望的行为可以通过添加第二个 subscribeOn 来归档,但是添加到您在 flatMap 中使用的 Mono。当 flatMap 现在订阅内容时,它将使用另一个线程。
如果您将代码更改为如下内容:
@Test
public void fluxWithRange_CustomTest() throws InterruptedException {
Flux<Integer> intFlux = Flux.range(1, 10)
.flatMap(i -> subFlux(i),2)
.subscribeOn(Schedulers.newBoundedElastic(2, 2, "test")).log();
StepVerifier.create(intFlux).expectNext(1, 2, 3, 4, 5, 6, 7, 8,9,10).verifyComplete(); //This now fails.
}
private Mono<Integer> subFlux(int i) {
Mono<Integer> result = Mono.create(sink ->
{
if (i % 2 == 0) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
sink.success(i);
});
return result.subscribeOn(Schedulers.newBoundedElastic(2, 2, "other"));
}