StreamEx.parallel().forEach() 不会在 .map() 之后并行 运行
StreamEx.parallel().forEach() does not run in parallel after .map()
我注意到,如果我使用 StreamEx lib 通过自定义 ForkJoinPool 并行处理我的流,如下所示 - 随后的操作会在该池的并行线程中执行 运行。但是,如果我添加一个 map() 操作并并行生成流 - 仅使用池中的一个线程。
下面是演示此问题的最小工作示例的完整代码(没有所有导入)。 executeAsParallelFromList() 和 executeAsParallelAfterMap() 方法之间的唯一区别是在 .parallel() 之前添加了 .map(...) 调用。
import one.util.streamex.StreamEx;
public class ParallelExample {
private static final Logger logger = LoggerFactory.getLogger(ParallelExample.class);
private static ForkJoinPool s3ThreadPool = new ForkJoinPool(3);
public static List<String> getTestList(){
int listSize = 10;
List<String> testList = new ArrayList<>();
for (int i=0; i<listSize; i++)
testList.add("item_" + i);
return testList;
}
public static void executeAsParallelFromList(){
logger.info("executeAsParallelFromList():");
List<String> testList = getTestList();
StreamEx<String> streamOfItems = StreamEx
.of(testList)
.parallel(s3ThreadPool);
logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
streamOfItems.forEach(item -> handleItem(item));
}
public static void executeAsParallelAfterMap(){
logger.info("executeAsParallelAfterMap():");
List<String> testList = getTestList();
StreamEx<String> streamOfItems = StreamEx
.of(testList)
.map(item -> item+"_mapped")
.parallel(s3ThreadPool);
logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
streamOfItems.forEach(item -> handleItem(item));
}
private static void handleItem(String item){
// do something with the item - just print for now
logger.info("I'm handling item: {}", item);
}
}
执行两种方法的单元测试:
public class ParallelExampleTest {
@Test
public void testExecuteAsParallelFromList() {
ParallelExample.executeAsParallelFromList();
}
@Test
public void testExecuteAsParallelFromStreamEx() {
ParallelExample.executeAsParallelAfterMap();
}
}
执行结果:
08:49:12.992 [main] INFO marina.streams.ParallelExample - executeAsParallelFromList():
08:49:13.002 [main] INFO marina.streams.ParallelExample - streamOfItems.isParallel(): true
08:49:13.040 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_6
08:49:13.040 [ForkJoinPool-1-worker-2] INFO marina.streams.ParallelExample - I'm handling item: item_2
08:49:13.040 [ForkJoinPool-1-worker-3] INFO marina.streams.ParallelExample - I'm handling item: item_1
08:49:13.041 [ForkJoinPool-1-worker-2] INFO marina.streams.ParallelExample - I'm handling item: item_4
08:49:13.041 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_8
08:49:13.041 [ForkJoinPool-1-worker-3] INFO marina.streams.ParallelExample - I'm handling item: item_0
08:49:13.041 [ForkJoinPool-1-worker-2] INFO marina.streams.ParallelExample - I'm handling item: item_3
08:49:13.041 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_9
08:49:13.041 [ForkJoinPool-1-worker-3] INFO marina.streams.ParallelExample - I'm handling item: item_5
08:49:13.041 [ForkJoinPool-1-worker-2] INFO marina.streams.ParallelExample - I'm handling item: item_7
08:49:13.043 [main] INFO marina.streams.ParallelExample - executeAsParallelAfterMap():
08:49:13.043 [main] INFO marina.streams.ParallelExample - streamOfItems.isParallel(): true
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_0_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_1_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_2_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_3_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_4_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_5_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_6_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_7_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_8_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_9_mapped
如您所见,执行executeAsParallelFromList()时使用了所有三个线程,但执行executeAsParallelAfterMap()时只使用了一个线程。
为什么?
谢谢!
玛丽娜
注意:该示例有意简化 - 我试图使其尽可能简单以演示该问题。显然,在现实生活中,map()、handleItem() 等中发生的事情要多得多,输入数据也更有趣(我正在尝试并行处理 AWS S3 buckets/prefixes)。
问题是,一旦您调用 map(...)
方法,StreamEx 就会创建基础 Java 8 流,并从那时起使用 sequential/parallel 配置(即顺序) ,然后调用 parallel(...)
似乎不会更新基础 Java 8 流。
解决方案取决于您要实现的目标。如果您对 map(...)
操作也能并行 运行 感到高兴,那么只需将 parallel(...)
操作向上移动,使其成为 of(...)
之后的第一件事.
但是,如果您希望某些操作在某些并行操作之前按顺序执行,那么您最好使用两个流。例如,按照您的示例代码的样式:
public static void executeAsParallelAfterMapV2() {
logger.info("executeAsParallelAfterMapV2():");
List<String> testList = getTestList();
StreamEx<String> sequentialStream = StreamEx
.of(testList)
.map(item -> {
logger.info("Mapping {}", item);
return item + "_mapped";
});
logger.info("sequentialStream.isParallel(): {}", sequentialStream.isParallel());
List<String> afterSequentialProcessing = sequentialStream.toList();
StreamEx<String> streamOfItems = StreamEx.of(afterSequentialProcessing)
.parallel(s3ThreadPool);
logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
streamOfItems.forEach(item -> handleItem(item));
}
这给出了类似的东西:
20:43:36.835 [main] INFO scott.streams.ParallelExample - executeAsParallelAfterMapV2():
20:43:36.883 [main] INFO scott.streams.ParallelExample - sequentialStream.isParallel(): false
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_0
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_1
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_2
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_3
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_4
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_5
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_6
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_7
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_8
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_9
20:43:36.886 [main] INFO scott.streams.ParallelExample - streamOfItems.isParallel(): true
20:43:36.889 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_6_mapped
20:43:36.889 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_2_mapped
20:43:36.890 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_8_mapped
20:43:36.890 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_5_mapped
20:43:36.890 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_4_mapped
20:43:36.890 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_9_mapped
20:43:36.890 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_1_mapped
20:43:36.890 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_3_mapped
20:43:36.890 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_7_mapped
20:43:36.890 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_0_mapped
旁白...
出于兴趣,如果您直接创建一个 Java 8 流(不使用 StreamEx),并将 parallel()
操作放在 map(...)
下方,那么它会更新要并行的(整个)流的类型:
public static void executeAsParallelAfterMapJava8Stream() throws InterruptedException {
logger.info("executeAsParallelAfterMapJava8Stream():");
List<String> testList = getTestList();
s3ThreadPool.submit(() -> {
Stream<String> streamOfItems = testList.stream()
.map(item -> {
logger.info("Mapping {}", item);
return item + "_mapped";
})
.parallel();
logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
streamOfItems.forEach(item -> handleItem(item));
}).join();
}
如果您创建类似的单元测试,那么您会得到类似于:
20:36:23.469 [main] INFO scott.streams.ParallelExample - executeAsParallelAfterMapJava8Stream():
20:36:23.517 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - streamOfItems.isParallel(): true
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_6
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - Mapping item_2
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - Mapping item_8
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_6_mapped
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_2_mapped
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_8_mapped
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_5
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - Mapping item_4
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - Mapping item_9
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_5_mapped
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_4_mapped
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_9_mapped
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_1
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - Mapping item_3
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - Mapping item_7
20:36:23.521 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_1_mapped
20:36:23.521 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_3_mapped
20:36:23.521 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_7_mapped
20:36:23.521 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_0
20:36:23.521 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_0_mapped
简单的回答:这是一个错误。我filed and fixed它。测试忽略了这一点,因为测试只检查所有操作是否在指定的池中执行,但不检查是否使用了池的不同线程(有时如果并行化不起作用也没关系,例如仅一个元素的流).
0.6.4 版本中提供了修复程序。在以前的版本中,您可以考虑使用 .parallel().parallel(fjp)
解决该问题:它应该正确并行化。
请考虑向官方 StreamEx issue tracker 报告 StreamEx 问题。这些天我只是偶尔访问Whosebug,所以可能会忽略这里报告的问题。
我注意到,如果我使用 StreamEx lib 通过自定义 ForkJoinPool 并行处理我的流,如下所示 - 随后的操作会在该池的并行线程中执行 运行。但是,如果我添加一个 map() 操作并并行生成流 - 仅使用池中的一个线程。
下面是演示此问题的最小工作示例的完整代码(没有所有导入)。 executeAsParallelFromList() 和 executeAsParallelAfterMap() 方法之间的唯一区别是在 .parallel() 之前添加了 .map(...) 调用。
import one.util.streamex.StreamEx;
public class ParallelExample {
private static final Logger logger = LoggerFactory.getLogger(ParallelExample.class);
private static ForkJoinPool s3ThreadPool = new ForkJoinPool(3);
public static List<String> getTestList(){
int listSize = 10;
List<String> testList = new ArrayList<>();
for (int i=0; i<listSize; i++)
testList.add("item_" + i);
return testList;
}
public static void executeAsParallelFromList(){
logger.info("executeAsParallelFromList():");
List<String> testList = getTestList();
StreamEx<String> streamOfItems = StreamEx
.of(testList)
.parallel(s3ThreadPool);
logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
streamOfItems.forEach(item -> handleItem(item));
}
public static void executeAsParallelAfterMap(){
logger.info("executeAsParallelAfterMap():");
List<String> testList = getTestList();
StreamEx<String> streamOfItems = StreamEx
.of(testList)
.map(item -> item+"_mapped")
.parallel(s3ThreadPool);
logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
streamOfItems.forEach(item -> handleItem(item));
}
private static void handleItem(String item){
// do something with the item - just print for now
logger.info("I'm handling item: {}", item);
}
}
执行两种方法的单元测试:
public class ParallelExampleTest {
@Test
public void testExecuteAsParallelFromList() {
ParallelExample.executeAsParallelFromList();
}
@Test
public void testExecuteAsParallelFromStreamEx() {
ParallelExample.executeAsParallelAfterMap();
}
}
执行结果:
08:49:12.992 [main] INFO marina.streams.ParallelExample - executeAsParallelFromList():
08:49:13.002 [main] INFO marina.streams.ParallelExample - streamOfItems.isParallel(): true
08:49:13.040 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_6
08:49:13.040 [ForkJoinPool-1-worker-2] INFO marina.streams.ParallelExample - I'm handling item: item_2
08:49:13.040 [ForkJoinPool-1-worker-3] INFO marina.streams.ParallelExample - I'm handling item: item_1
08:49:13.041 [ForkJoinPool-1-worker-2] INFO marina.streams.ParallelExample - I'm handling item: item_4
08:49:13.041 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_8
08:49:13.041 [ForkJoinPool-1-worker-3] INFO marina.streams.ParallelExample - I'm handling item: item_0
08:49:13.041 [ForkJoinPool-1-worker-2] INFO marina.streams.ParallelExample - I'm handling item: item_3
08:49:13.041 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_9
08:49:13.041 [ForkJoinPool-1-worker-3] INFO marina.streams.ParallelExample - I'm handling item: item_5
08:49:13.041 [ForkJoinPool-1-worker-2] INFO marina.streams.ParallelExample - I'm handling item: item_7
08:49:13.043 [main] INFO marina.streams.ParallelExample - executeAsParallelAfterMap():
08:49:13.043 [main] INFO marina.streams.ParallelExample - streamOfItems.isParallel(): true
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_0_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_1_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_2_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_3_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_4_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_5_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_6_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_7_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_8_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_9_mapped
如您所见,执行executeAsParallelFromList()时使用了所有三个线程,但执行executeAsParallelAfterMap()时只使用了一个线程。
为什么?
谢谢!
玛丽娜
注意:该示例有意简化 - 我试图使其尽可能简单以演示该问题。显然,在现实生活中,map()、handleItem() 等中发生的事情要多得多,输入数据也更有趣(我正在尝试并行处理 AWS S3 buckets/prefixes)。
问题是,一旦您调用 map(...)
方法,StreamEx 就会创建基础 Java 8 流,并从那时起使用 sequential/parallel 配置(即顺序) ,然后调用 parallel(...)
似乎不会更新基础 Java 8 流。
解决方案取决于您要实现的目标。如果您对 map(...)
操作也能并行 运行 感到高兴,那么只需将 parallel(...)
操作向上移动,使其成为 of(...)
之后的第一件事.
但是,如果您希望某些操作在某些并行操作之前按顺序执行,那么您最好使用两个流。例如,按照您的示例代码的样式:
public static void executeAsParallelAfterMapV2() {
logger.info("executeAsParallelAfterMapV2():");
List<String> testList = getTestList();
StreamEx<String> sequentialStream = StreamEx
.of(testList)
.map(item -> {
logger.info("Mapping {}", item);
return item + "_mapped";
});
logger.info("sequentialStream.isParallel(): {}", sequentialStream.isParallel());
List<String> afterSequentialProcessing = sequentialStream.toList();
StreamEx<String> streamOfItems = StreamEx.of(afterSequentialProcessing)
.parallel(s3ThreadPool);
logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
streamOfItems.forEach(item -> handleItem(item));
}
这给出了类似的东西:
20:43:36.835 [main] INFO scott.streams.ParallelExample - executeAsParallelAfterMapV2():
20:43:36.883 [main] INFO scott.streams.ParallelExample - sequentialStream.isParallel(): false
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_0
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_1
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_2
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_3
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_4
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_5
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_6
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_7
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_8
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_9
20:43:36.886 [main] INFO scott.streams.ParallelExample - streamOfItems.isParallel(): true
20:43:36.889 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_6_mapped
20:43:36.889 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_2_mapped
20:43:36.890 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_8_mapped
20:43:36.890 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_5_mapped
20:43:36.890 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_4_mapped
20:43:36.890 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_9_mapped
20:43:36.890 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_1_mapped
20:43:36.890 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_3_mapped
20:43:36.890 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_7_mapped
20:43:36.890 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_0_mapped
旁白...
出于兴趣,如果您直接创建一个 Java 8 流(不使用 StreamEx),并将 parallel()
操作放在 map(...)
下方,那么它会更新要并行的(整个)流的类型:
public static void executeAsParallelAfterMapJava8Stream() throws InterruptedException {
logger.info("executeAsParallelAfterMapJava8Stream():");
List<String> testList = getTestList();
s3ThreadPool.submit(() -> {
Stream<String> streamOfItems = testList.stream()
.map(item -> {
logger.info("Mapping {}", item);
return item + "_mapped";
})
.parallel();
logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
streamOfItems.forEach(item -> handleItem(item));
}).join();
}
如果您创建类似的单元测试,那么您会得到类似于:
20:36:23.469 [main] INFO scott.streams.ParallelExample - executeAsParallelAfterMapJava8Stream():
20:36:23.517 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - streamOfItems.isParallel(): true
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_6
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - Mapping item_2
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - Mapping item_8
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_6_mapped
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_2_mapped
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_8_mapped
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_5
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - Mapping item_4
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - Mapping item_9
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_5_mapped
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_4_mapped
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_9_mapped
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_1
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - Mapping item_3
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - Mapping item_7
20:36:23.521 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_1_mapped
20:36:23.521 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_3_mapped
20:36:23.521 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_7_mapped
20:36:23.521 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_0
20:36:23.521 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_0_mapped
简单的回答:这是一个错误。我filed and fixed它。测试忽略了这一点,因为测试只检查所有操作是否在指定的池中执行,但不检查是否使用了池的不同线程(有时如果并行化不起作用也没关系,例如仅一个元素的流).
0.6.4 版本中提供了修复程序。在以前的版本中,您可以考虑使用 .parallel().parallel(fjp)
解决该问题:它应该正确并行化。
请考虑向官方 StreamEx issue tracker 报告 StreamEx 问题。这些天我只是偶尔访问Whosebug,所以可能会忽略这里报告的问题。