Iterables.partition() 生成的 Spliterator 未按预期运行?
Spliterator generated by Iterables.partition() doesn't behave as expected?
我注意到使用 Guava 的 Iterables.partition(collection, partitionSize).spliterator()
生成的拆分器表现得很奇怪。
在生成的拆分器上执行 trySplit() 不会拆分,但在初始 trySplit() 的结果上执行 trySplit() 最终会拆分。
此外,使用 StreamSupport.stream(Iterables.partition(collection, partitionSize).spliterator(), true)
不会并行化流,但是
StreamSupport.stream(Iterables.partition(collection, partitionSize).spliterator().trySplit(), true)
执行并行化并且生成的流包含所有分区。
我的目标是:给定一个大小为 100k 的集合,我想将它分成大小为 5000 的批次并并行处理这些批次。
2 个问题:Iterables.partition 生成的拆分器是否正确运行?
我的方法是实现目标的好方法吗?
这里的问题是 Spliterator
来自一个没有已知大小的 Iterable
。因此,内部实现会将元素缓冲到大小为 1024
的缓冲区中,并在下一次迭代中继续增加缓冲区。我的意思是:
List<Integer> coll = IntStream.range(0, 150_000).boxed().collect(Collectors.toList());
Iterable<List<Integer>> it = Iterables.partition(coll, 1);
Spliterator<List<Integer>> sp = it.spliterator();
Spliterator<List<Integer>> one = sp.trySplit();
System.out.println(one.getExactSizeIfKnown());
Spliterator<List<Integer>> two = sp.trySplit();
System.out.println(two.getExactSizeIfKnown());
Spliterator<List<Integer>> three = sp.trySplit();
System.out.println(three.getExactSizeIfKnown());
Spliterator<List<Integer>> four = sp.trySplit();
System.out.println(four.getExactSizeIfKnown());
这将打印:
1024
2048
3072
4096
如果您想一次处理 5000
个元素,您需要从具有已知大小的 Spliterator
开始。您可以先将这些分区放入 ArrayList
:
public static void main(String[] args) {
List<Integer> coll = IntStream.range(0, 15_000).boxed().collect(Collectors.toList());
Iterable<List<Integer>> it = Iterables.partition(coll, 5000);
List<List<Integer>> list = new ArrayList<>();
it.forEach(list::add);
StreamSupport.stream(list.spliterator(), true)
.map(x -> {
System.out.println(
"Thread : " + Thread.currentThread().getName() +
" processed elements in the range : " + x.get(0) + " , " + x.get(x.size() - 1)
);
return x;
})
.flatMap(List::stream)
.collect(Collectors.toList());
}
在我的机器上,它显示它们分别由一个线程处理:
Thread : ForkJoinPool.commonPool-worker-5 processed elements in the range : 10000 , 14999
Thread : ForkJoinPool.commonPool-worker-19 processed elements in the range : 0 , 4999
Thread : main processed elements in the range : 5000 , 9999
我注意到使用 Guava 的 Iterables.partition(collection, partitionSize).spliterator()
生成的拆分器表现得很奇怪。
在生成的拆分器上执行 trySplit() 不会拆分,但在初始 trySplit() 的结果上执行 trySplit() 最终会拆分。
此外,使用 StreamSupport.stream(Iterables.partition(collection, partitionSize).spliterator(), true)
不会并行化流,但是
StreamSupport.stream(Iterables.partition(collection, partitionSize).spliterator().trySplit(), true)
执行并行化并且生成的流包含所有分区。
我的目标是:给定一个大小为 100k 的集合,我想将它分成大小为 5000 的批次并并行处理这些批次。
2 个问题:Iterables.partition 生成的拆分器是否正确运行? 我的方法是实现目标的好方法吗?
这里的问题是 Spliterator
来自一个没有已知大小的 Iterable
。因此,内部实现会将元素缓冲到大小为 1024
的缓冲区中,并在下一次迭代中继续增加缓冲区。我的意思是:
List<Integer> coll = IntStream.range(0, 150_000).boxed().collect(Collectors.toList());
Iterable<List<Integer>> it = Iterables.partition(coll, 1);
Spliterator<List<Integer>> sp = it.spliterator();
Spliterator<List<Integer>> one = sp.trySplit();
System.out.println(one.getExactSizeIfKnown());
Spliterator<List<Integer>> two = sp.trySplit();
System.out.println(two.getExactSizeIfKnown());
Spliterator<List<Integer>> three = sp.trySplit();
System.out.println(three.getExactSizeIfKnown());
Spliterator<List<Integer>> four = sp.trySplit();
System.out.println(four.getExactSizeIfKnown());
这将打印:
1024
2048
3072
4096
如果您想一次处理 5000
个元素,您需要从具有已知大小的 Spliterator
开始。您可以先将这些分区放入 ArrayList
:
public static void main(String[] args) {
List<Integer> coll = IntStream.range(0, 15_000).boxed().collect(Collectors.toList());
Iterable<List<Integer>> it = Iterables.partition(coll, 5000);
List<List<Integer>> list = new ArrayList<>();
it.forEach(list::add);
StreamSupport.stream(list.spliterator(), true)
.map(x -> {
System.out.println(
"Thread : " + Thread.currentThread().getName() +
" processed elements in the range : " + x.get(0) + " , " + x.get(x.size() - 1)
);
return x;
})
.flatMap(List::stream)
.collect(Collectors.toList());
}
在我的机器上,它显示它们分别由一个线程处理:
Thread : ForkJoinPool.commonPool-worker-5 processed elements in the range : 10000 , 14999
Thread : ForkJoinPool.commonPool-worker-19 processed elements in the range : 0 , 4999
Thread : main processed elements in the range : 5000 , 9999