为什么我的 Flux 流 运行 并行跳过一些记录?
Why does my Flux stream running in parallel skipping some records?
我正在尝试理解 Flux.parallel()
的工作原理。
我的要求如下:
有一个字符串列表 stringList
,假设我们正在对每个字符串并行执行一些任务,其中一个字符串需要一些时间,例如这里我使用的是 Thread.sleep
,那么我想收集 Flux<String>
中的字符串,但在此之前我需要确定所有字符串的并行执行是否完成。
在下面的代码中,它跳过了字符串Mango
,这是为什么呢?
如果我使用 blockLast()
那么它不会跳过,但是如果我使用 blockLast()
,它不会给出 return Flux<String>
。
另外,sequential
和 blockLast
有什么区别?
List<String> list = Arrays.asList("Mango", "Apple", "Grapes", "Java");
Flux<String> flust = Flux.fromIterable(list)
.parallel(10)
.runOn(Schedulers.parllel(), list.size())
.map(string -> {
if(string.equals("Mango")) {
try {
Thread.sleep(2000);
} catch(InterruptedException e) {
e.printStackTrace();
}
}
//some task
System.out.println(Thread.currentThread().getName() + " " + string);
return string;
})
.sequential();
flust.subscribe(System.out::println);
输出:
parallel-2 Apple
并行 4 Java
平行 3 颗葡萄
苹果
葡萄
Java
Reactorjavadoc订阅方法中提到:
Keep in mind that since the sequence can be asynchronous, this will
immediately return control to the calling thread. This can give the
impression the consumer is not invoked when executing in a main thread
or a unit test for instance.
这意味着到达了 main 方法的末尾,因此主线程在 Mango
元素被消耗之前退出。
要解决这个问题,您可以使用 blockLast()
而不是 subscribe()
并在 doOnNext
方法中触发 side-effect (打印):
Flux<String> flust = Flux.fromIterable(list)
.parallel(10)
.runOn(Schedulers.parallel(), list.size())
.map(string -> {
if (string.equals("Mango")) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//some task
System.out.println(Thread.currentThread().getName() + " " + string);
return string;
})
.sequential()
.doOnNext(System.out::println);
flust.blockLast();
在这里,您阻止主线程上的执行,直到处理完 Flux
上的最后一个元素。
我正在尝试理解 Flux.parallel()
的工作原理。
我的要求如下:
有一个字符串列表 stringList
,假设我们正在对每个字符串并行执行一些任务,其中一个字符串需要一些时间,例如这里我使用的是 Thread.sleep
,那么我想收集 Flux<String>
中的字符串,但在此之前我需要确定所有字符串的并行执行是否完成。
在下面的代码中,它跳过了字符串Mango
,这是为什么呢?
如果我使用 blockLast()
那么它不会跳过,但是如果我使用 blockLast()
,它不会给出 return Flux<String>
。
另外,sequential
和 blockLast
有什么区别?
List<String> list = Arrays.asList("Mango", "Apple", "Grapes", "Java");
Flux<String> flust = Flux.fromIterable(list)
.parallel(10)
.runOn(Schedulers.parllel(), list.size())
.map(string -> {
if(string.equals("Mango")) {
try {
Thread.sleep(2000);
} catch(InterruptedException e) {
e.printStackTrace();
}
}
//some task
System.out.println(Thread.currentThread().getName() + " " + string);
return string;
})
.sequential();
flust.subscribe(System.out::println);
输出:
parallel-2 Apple
并行 4 Java
平行 3 颗葡萄
苹果
葡萄
Java
Reactorjavadoc订阅方法中提到:
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
这意味着到达了 main 方法的末尾,因此主线程在 Mango
元素被消耗之前退出。
要解决这个问题,您可以使用 blockLast()
而不是 subscribe()
并在 doOnNext
方法中触发 side-effect (打印):
Flux<String> flust = Flux.fromIterable(list)
.parallel(10)
.runOn(Schedulers.parallel(), list.size())
.map(string -> {
if (string.equals("Mango")) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//some task
System.out.println(Thread.currentThread().getName() + " " + string);
return string;
})
.sequential()
.doOnNext(System.out::println);
flust.blockLast();
在这里,您阻止主线程上的执行,直到处理完 Flux
上的最后一个元素。