Java Spliterator : 如何平均处理大流拆分?
Java Spliterator : How to process large Stream splits equally?
我正在使用的代码
package com.skimmer;
import java.util.ArrayList;
import java.util.List;
import java.util.Spliterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.LongStream;
import java.util.stream.Stream;
public class App {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// Simply creating some 'test' data
Stream<String> test = LongStream.range(0, 10000000L).mapToObj(i -> i + "-test");
Spliterator<String> spliterator = test.parallel().spliterator();
List<Callable<Long>> callableList = new ArrayList<Callable<Long>>();
// Creating a future for each split to process concurrently
int totalSplits = 0;
while ((spliterator = spliterator.trySplit()) != null) {
totalSplits++;
callableList.add(new Worker(spliterator, "future-" + totalSplits));
}
ExecutorService executor = Executors.newFixedThreadPool(totalSplits);
List<Future<Long>> futures = executor.invokeAll(callableList);
AtomicLong counter = new AtomicLong(0);
for (Future<Long> future : futures)
counter.getAndAdd(future.get());
System.out.println("Total processed " + counter.get());
System.out.println("Total splits " + totalSplits);
executor.shutdown();
}
public static class Worker implements Callable<Long> {
private Spliterator<String> spliterator;
private String name;
public Worker(Spliterator<String> spliterator, String name) {
this.spliterator = spliterator;
this.name = name;
}
@Override
public Long call() {
AtomicLong counter = new AtomicLong(0);
spliterator.forEachRemaining(s -> {
// We'll assume busy processing code here
counter.getAndIncrement();
});
System.out.println(name + " Total processed : " + counter.get());
return counter.get();
}
}
}
输出
furture-11 Total processed : 244
furture-10 Total processed : 488
furture-9 Total processed : 977
furture-12 Total processed : 122
furture-7 Total processed : 3906
furture-13 Total processed : 61
furture-8 Total processed : 1953
furture-6 Total processed : 7813
furture-14 Total processed : 31
furture-5 Total processed : 15625
furture-15 Total processed : 15
furture-4 Total processed : 31250
furture-17 Total processed : 4
furture-18 Total processed : 2
furture-19 Total processed : 1
furture-16 Total processed : 8
furture-3 Total processed : 62500
furture-2 Total processed : 125000
furture-1 Total processed : 250000
future-0 Total processed : 500000
Total processed 1000000
Total splits 20
我的problem/Question:
第一个 trySplit(以及未来的任务 'future-0')恰好获得 n/2 个元素以开始处理。第一对分裂需要很长时间才能完成——随着 n 的增长,情况会变得更糟。是否有任何其他方法来处理流,其中每个 future/callable 获得平等分配的元素来处理,例如 (N/splits) 即。 1000000/20 = 50000
想要的结果
furture-11 Total processed : 50000
furture-10 Total processed : 50000
furture-9 Total processed : 50000
furture-12 Total processed : 50000
furture-7 Total processed : 50000
furture-13 Total processed : 50000
furture-8 Total processed : 50000
furture-6 Total processed : 50000
furture-14 Total processed : 50000
furture-5 Total processed : 50000
furture-15 Total processed : 50000
furture-4 Total processed : 50000
furture-17 Total processed : 50000
furture-18 Total processed : 50000
furture-19 Total processed : 50000
furture-16 Total processed : 50000
furture-3 Total processed : 50000
furture-2 Total processed : 50000
furture-1 Total processed : 50000
future-0 Total processed : 50000
Total processed 1000000
Total splits 20
跟进问题: 如果 Spliterator 无法执行此操作,其他 approach/solution 最好用于同时处理大型流。
实际案例场景: 处理太大而无法保存在内存中的大型 (6GB) CSV 文件
您 在这里获得完美平衡的拆分。问题是,每次将一个元素序列分成两半(由两个 Spliterator
实例表示)时,您都会为其中一半创建一个作业,甚至不会尝试进一步拆分它,而只会细分另一半.
因此,在第一次拆分之后,您立即创建了一个包含 500,000 个元素的作业。然后,您在其他 500,000 个元素上调用 trySplit
,将其完美拆分为两个 250,000 个元素的块,创建另一个覆盖 250,000 个元素的块的作业,并只尝试细分另一个。等等。是你的代码造成了不平衡的工作。
当您将第一部分更改为
// Simply creating some 'test' data
Stream<String> test = LongStream.range(0, 10000000L).mapToObj(i -> i + "-test");
// Creating a future for each split to process concurrently
List<Callable<Long>> callableList = new ArrayList<>();
int workChunkTarget = 5000;
Deque<Spliterator<String>> spliterators = new ArrayDeque<>();
spliterators.add(test.parallel().spliterator());
int totalSplits = 0;
while(!spliterators.isEmpty()) {
Spliterator<String> spliterator = spliterators.pop();
Spliterator<String> prefix;
while(spliterator.estimateSize() > workChunkTarget
&& (prefix = spliterator.trySplit()) != null) {
spliterators.push(spliterator);
spliterator = prefix;
}
totalSplits++;
callableList.add(new Worker(spliterator, "future-" + totalSplits));
}
你会安静地接近你想要的目标工作负载大小(尽可能接近,因为数字不是 2 的幂)。
Spliterator
设计使用像 ForkJoinTask
这样的工具工作得更顺畅,在每次成功 trySplit
之后可以提交一个新的作业,作业本身将决定拆分和产生新的当工作线程未饱和时并发作业(如参考实现中完成的并行流操作)。
我正在使用的代码
package com.skimmer;
import java.util.ArrayList;
import java.util.List;
import java.util.Spliterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.LongStream;
import java.util.stream.Stream;
public class App {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// Simply creating some 'test' data
Stream<String> test = LongStream.range(0, 10000000L).mapToObj(i -> i + "-test");
Spliterator<String> spliterator = test.parallel().spliterator();
List<Callable<Long>> callableList = new ArrayList<Callable<Long>>();
// Creating a future for each split to process concurrently
int totalSplits = 0;
while ((spliterator = spliterator.trySplit()) != null) {
totalSplits++;
callableList.add(new Worker(spliterator, "future-" + totalSplits));
}
ExecutorService executor = Executors.newFixedThreadPool(totalSplits);
List<Future<Long>> futures = executor.invokeAll(callableList);
AtomicLong counter = new AtomicLong(0);
for (Future<Long> future : futures)
counter.getAndAdd(future.get());
System.out.println("Total processed " + counter.get());
System.out.println("Total splits " + totalSplits);
executor.shutdown();
}
public static class Worker implements Callable<Long> {
private Spliterator<String> spliterator;
private String name;
public Worker(Spliterator<String> spliterator, String name) {
this.spliterator = spliterator;
this.name = name;
}
@Override
public Long call() {
AtomicLong counter = new AtomicLong(0);
spliterator.forEachRemaining(s -> {
// We'll assume busy processing code here
counter.getAndIncrement();
});
System.out.println(name + " Total processed : " + counter.get());
return counter.get();
}
}
}
输出
furture-11 Total processed : 244
furture-10 Total processed : 488
furture-9 Total processed : 977
furture-12 Total processed : 122
furture-7 Total processed : 3906
furture-13 Total processed : 61
furture-8 Total processed : 1953
furture-6 Total processed : 7813
furture-14 Total processed : 31
furture-5 Total processed : 15625
furture-15 Total processed : 15
furture-4 Total processed : 31250
furture-17 Total processed : 4
furture-18 Total processed : 2
furture-19 Total processed : 1
furture-16 Total processed : 8
furture-3 Total processed : 62500
furture-2 Total processed : 125000
furture-1 Total processed : 250000
future-0 Total processed : 500000
Total processed 1000000
Total splits 20
我的problem/Question: 第一个 trySplit(以及未来的任务 'future-0')恰好获得 n/2 个元素以开始处理。第一对分裂需要很长时间才能完成——随着 n 的增长,情况会变得更糟。是否有任何其他方法来处理流,其中每个 future/callable 获得平等分配的元素来处理,例如 (N/splits) 即。 1000000/20 = 50000
想要的结果
furture-11 Total processed : 50000
furture-10 Total processed : 50000
furture-9 Total processed : 50000
furture-12 Total processed : 50000
furture-7 Total processed : 50000
furture-13 Total processed : 50000
furture-8 Total processed : 50000
furture-6 Total processed : 50000
furture-14 Total processed : 50000
furture-5 Total processed : 50000
furture-15 Total processed : 50000
furture-4 Total processed : 50000
furture-17 Total processed : 50000
furture-18 Total processed : 50000
furture-19 Total processed : 50000
furture-16 Total processed : 50000
furture-3 Total processed : 50000
furture-2 Total processed : 50000
furture-1 Total processed : 50000
future-0 Total processed : 50000
Total processed 1000000
Total splits 20
跟进问题: 如果 Spliterator 无法执行此操作,其他 approach/solution 最好用于同时处理大型流。
实际案例场景: 处理太大而无法保存在内存中的大型 (6GB) CSV 文件
您 在这里获得完美平衡的拆分。问题是,每次将一个元素序列分成两半(由两个 Spliterator
实例表示)时,您都会为其中一半创建一个作业,甚至不会尝试进一步拆分它,而只会细分另一半.
因此,在第一次拆分之后,您立即创建了一个包含 500,000 个元素的作业。然后,您在其他 500,000 个元素上调用 trySplit
,将其完美拆分为两个 250,000 个元素的块,创建另一个覆盖 250,000 个元素的块的作业,并只尝试细分另一个。等等。是你的代码造成了不平衡的工作。
当您将第一部分更改为
// Simply creating some 'test' data
Stream<String> test = LongStream.range(0, 10000000L).mapToObj(i -> i + "-test");
// Creating a future for each split to process concurrently
List<Callable<Long>> callableList = new ArrayList<>();
int workChunkTarget = 5000;
Deque<Spliterator<String>> spliterators = new ArrayDeque<>();
spliterators.add(test.parallel().spliterator());
int totalSplits = 0;
while(!spliterators.isEmpty()) {
Spliterator<String> spliterator = spliterators.pop();
Spliterator<String> prefix;
while(spliterator.estimateSize() > workChunkTarget
&& (prefix = spliterator.trySplit()) != null) {
spliterators.push(spliterator);
spliterator = prefix;
}
totalSplits++;
callableList.add(new Worker(spliterator, "future-" + totalSplits));
}
你会安静地接近你想要的目标工作负载大小(尽可能接近,因为数字不是 2 的幂)。
Spliterator
设计使用像 ForkJoinTask
这样的工具工作得更顺畅,在每次成功 trySplit
之后可以提交一个新的作业,作业本身将决定拆分和产生新的当工作线程未饱和时并发作业(如参考实现中完成的并行流操作)。