使用 ForkJoin 和 Streams 构建自适应网格细化
Build an adaptative mesh refinement with ForkJoin and Streams
我想在 3D 中构建自适应网格细化。
基本原理如下:
我有一组具有唯一单元格 ID 的单元格。
我测试每个细胞,看看是否需要细化。
- 如果需要优化,请创建 8 个新的 child 单元格并将它们添加到单元格列表中以检查优化。
- 否则,这是一个叶节点,我将它添加到我的叶节点列表中。
我想使用 ForkJoin 框架和 Java 8 个流来实现它。我读了 this article,但我不知道如何将它应用到我的案例中。
现在,我想到的是:
public class ForkJoinAttempt {
private final double[] cellIds;
public ForkJoinAttempt(double[] cellIds) {
this.cellIds = cellIds;
}
public void refineGrid() {
ForkJoinPool pool = ForkJoinPool.commonPool();
double[] result = pool.invoke(new RefineTask(100));
}
private class RefineTask extends RecursiveTask<double[]> {
final double cellId;
private RefineTask(double cellId) {
this.cellId = cellId;
}
@Override
protected double[] compute() {
return ForkJoinTask.invokeAll(createSubtasks())
.stream()
.map(ForkJoinTask::join)
.reduce(new double[0], new Concat());
}
}
private double[] refineCell(double cellId) {
double[] result;
if (checkCell()) {
result = new double[8];
for (int i = 0; i < 8; i++) {
result[i] = Math.random();
}
} else {
result = new double[1];
result[0] = cellId;
}
return result;
}
private Collection<RefineTask> createSubtasks() {
List<RefineTask> dividedTasks = new ArrayList<>();
for (int i = 0; i < cellIds.length; i++) {
dividedTasks.add(new RefineTask(cellIds[i]));
}
return dividedTasks;
}
private class Concat implements BinaryOperator<double[]> {
@Override
public double[] apply(double[] a, double[] b) {
int aLen = a.length;
int bLen = b.length;
@SuppressWarnings("unchecked")
double[] c = (double[]) Array.newInstance(a.getClass().getComponentType(), aLen + bLen);
System.arraycopy(a, 0, c, 0, aLen);
System.arraycopy(b, 0, c, aLen, bLen);
return c;
}
}
public boolean checkCell() {
return Math.random() < 0.5;
}
}
...我被困在这里了。
目前这没什么用,因为我从来没有调用过 refineCell
函数。
我创建的所有 double[]
也可能存在性能问题。以这种方式合并它们可能也不是最有效的方式。
但首先,在这种情况下,任何人都可以帮助我实现 fork join 吗?
该算法的预期结果是叶单元 ID 数组 (double[]
)
编辑 1:
多亏了评论,我想出了一些更好用的东西。
一些变化:
- 我从数组转到列表。这对内存占用不利,因为我无法使用 Java 原语。但它使植入更简单。
- 小区 ID 现在是 Long 而不是 Double。
- 不再随机选择 ID:
- 根级单元格的 ID 为 1、2、3 等;
- Children of 1 的 ID 为 10、11、12 等;
- Children of 2 的 ID 为 20、21、22 等;
- 你明白了...
- 我优化了所有ID小于100的单元格
为了这个例子,这让我可以更好地检查结果。
这是新的实现:
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.BinaryOperator;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
public class ForkJoinAttempt {
private static final int THRESHOLD = 2;
private List<Long> leafCellIds;
public void refineGrid(List<Long> cellsToProcess) {
leafCellIds = ForkJoinPool.commonPool().invoke(new RefineTask(cellsToProcess));
}
public List<Long> getLeafCellIds() {
return leafCellIds;
}
private class RefineTask extends RecursiveTask<List<Long>> {
private final CopyOnWriteArrayList<Long> cellsToProcess = new CopyOnWriteArrayList<>();
private RefineTask(List<Long> cellsToProcess) {
this.cellsToProcess.addAll(cellsToProcess);
}
@Override
protected List<Long> compute() {
if (cellsToProcess.size() > THRESHOLD) {
System.out.println("Fork/Join");
return ForkJoinTask.invokeAll(createSubTasks())
.stream()
.map(ForkJoinTask::join)
.reduce(new ArrayList<>(), new Concat());
} else {
System.out.println("Direct computation");
List<Long> leafCells = new ArrayList<>();
for (Long cell : cellsToProcess) {
Long result = refineCell(cell);
if (result != null) {
leafCells.add(result);
}
}
return leafCells;
}
}
private Collection<RefineTask> createSubTasks() {
List<RefineTask> dividedTasks = new ArrayList<>();
for (List<Long> list : split(cellsToProcess)) {
dividedTasks.add(new RefineTask(list));
}
return dividedTasks;
}
private Long refineCell(Long cellId) {
if (checkCell(cellId)) {
for (int i = 0; i < 8; i++) {
Long newCell = cellId * 10 + i;
cellsToProcess.add(newCell);
System.out.println("Adding child " + newCell + " to cell " + cellId);
}
return null;
} else {
System.out.println("Leaf node " + cellId);
return cellId;
}
}
private List<List<Long>> split(List<Long> list)
{
int[] index = {0, (list.size() + 1)/2, list.size()};
List<List<Long>> lists = IntStream.rangeClosed(0, 1)
.mapToObj(i -> list.subList(index[i], index[i + 1]))
.collect(Collectors.toList());
return lists;
}
}
private class Concat implements BinaryOperator<List<Long>> {
@Override
public List<Long> apply(List<Long> listOne, List<Long> listTwo) {
return Stream.concat(listOne.stream(), listTwo.stream())
.collect(Collectors.toList());
}
}
public boolean checkCell(Long cellId) {
return cellId < 100;
}
}
以及测试它的方法:
int initialSize = 4;
List<Long> cellIds = new ArrayList<>(initialSize);
for (int i = 0; i < initialSize; i++) {
cellIds.add(Long.valueOf(i + 1));
}
ForkJoinAttempt test = new ForkJoinAttempt();
test.refineGrid(cellIds);
List<Long> leafCellIds = test.getLeafCellIds();
System.out.println("Leaf nodes: " + leafCellIds.size());
for (Long node : leafCellIds) {
System.out.println(node);
}
输出确认它向每个根单元格添加了 8 children。但它并没有更进一步。
我知道为什么,但我不知道如何解决:这是因为尽管 refineCell 方法将新单元格添加到要处理的单元格列表中。 createSubTask 方法没有被再次调用,所以它不知道我添加了新的单元格。
编辑 2:
为了换句话说,我正在寻找一种机制,其中 Queue
单元格 ID 由一些 RecursiveTask
处理,而其他单元格 ID 添加到 Queue
并行
首先,让我们从基于流的解决方案开始
public class Mesh {
public static long[] refineGrid(long[] cellsToProcess) {
return Arrays.stream(cellsToProcess).parallel().flatMap(Mesh::expand).toArray();
}
static LongStream expand(long d) {
return checkCell(d)? LongStream.of(d): generate(d).flatMap(Mesh::expand);
}
private static boolean checkCell(long cellId) {
return cellId > 100;
}
private static LongStream generate(long cellId) {
return LongStream.range(0, 8).map(j -> cellId * 10 + j);
}
}
虽然当前的 flatMap
实施具有 并行处理,当网格太不平衡时可能适用,但实际任务的性能可能是合理的,所以这个简单的解决方案总是值得的尝试一下,然后再开始实施更复杂的东西。
如果您确实需要自定义实现,例如如果工作负载不均衡,Stream实现适配不够好,可以这样:
public class MeshTask extends RecursiveTask<long[]> {
public static long[] refineGrid(long[] cellsToProcess) {
return new MeshTask(cellsToProcess, 0, cellsToProcess.length).compute();
}
private final long[] source;
private final int from, to;
private MeshTask(long[] src, int from, int to) {
source = src;
this.from = from;
this.to = to;
}
@Override
protected long[] compute() {
return compute(source, from, to);
}
private static long[] compute(long[] source, int from, int to) {
long[] result = new long[to - from];
ArrayDeque<MeshTask> next = new ArrayDeque<>();
while(getSurplusQueuedTaskCount()<3) {
int mid = (from+to)>>>1;
if(mid == from) break;
MeshTask task = new MeshTask(source, mid, to);
next.push(task);
task.fork();
to = mid;
}
int pos = 0;
for(; from < to; ) {
long value = source[from++];
if(checkCell(value)) result[pos++]=value;
else {
long[] array = generate(value);
array = compute(array, 0, array.length);
result = Arrays.copyOf(result, result.length+array.length-1);
System.arraycopy(array, 0, result, pos, array.length);
pos += array.length;
}
while(from == to && !next.isEmpty()) {
MeshTask task = next.pop();
if(task.tryUnfork()) {
to = task.to;
}
else {
long[] array = task.join();
int newLen = pos+to-from+array.length;
if(newLen != result.length)
result = Arrays.copyOf(result, newLen);
System.arraycopy(array, 0, result, pos, array.length);
pos += array.length;
}
}
}
return result;
}
static boolean checkCell(long cellId) {
return cellId > 1000;
}
static long[] generate(long cellId) {
long[] sub = new long[8];
for(int i = 0; i < sub.length; i++) sub[i] = cellId*10+i;
return sub;
}
}
此实现直接调用根任务的 compute
方法,将调用线程合并到计算中。 compute
方法使用getSurplusQueuedTaskCount()
来决定是否拆分。正如它的文档所说,这个想法是总是有少量盈余,例如。 3
。这确保了评估可以适应不平衡的工作负载,因为空闲线程可以从其他任务中窃取工作。
拆分不是通过创建两个子任务并等待两者来完成的。相反,只有一个任务被拆分,代表待处理工作的后半部分,当前任务的工作量会调整以反映前半部分。
然后,剩余的工作量在本地处理。之后,弹出最后推送的子任务并尝试 unfork。如果 unforking 成功,则当前工作负载的范围也被调整以覆盖后续任务的范围并且局部迭代继续。
这样,任何没有被另一个线程窃取的多余任务都以最简单、最轻量级的方式处理,就好像它从未被分叉过一样。
如果任务已经被另一个线程拾取,我们现在必须等待它完成并合并结果数组。
请注意,当通过 join()
等待子任务时,底层实现还将检查是否可以取消分叉和本地评估,以保持所有工作线程忙碌。然而,调整我们的循环变量并直接将结果累加到我们的目标数组中仍然比仍然需要合并结果数组的嵌套 compute
调用更好。
如果单元格不是叶子,则生成的节点将由相同的逻辑递归处理。这再次允许自适应本地和并发评估,因此执行将适应不平衡的工作负载,例如如果特定单元格具有更大的子树或特定单元格的评估任务比其他单元格长得多。
必须强调的是,在所有情况下,都需要大量的处理工作量才能从并行处理中获益。如果像示例中那样,主要只是数据复制,那么好处可能会小得多,甚至不存在,或者在最坏的情况下,并行处理的性能可能比顺序处理差。
我想在 3D 中构建自适应网格细化。
基本原理如下:
我有一组具有唯一单元格 ID 的单元格。 我测试每个细胞,看看是否需要细化。
- 如果需要优化,请创建 8 个新的 child 单元格并将它们添加到单元格列表中以检查优化。
- 否则,这是一个叶节点,我将它添加到我的叶节点列表中。
我想使用 ForkJoin 框架和 Java 8 个流来实现它。我读了 this article,但我不知道如何将它应用到我的案例中。
现在,我想到的是:
public class ForkJoinAttempt {
private final double[] cellIds;
public ForkJoinAttempt(double[] cellIds) {
this.cellIds = cellIds;
}
public void refineGrid() {
ForkJoinPool pool = ForkJoinPool.commonPool();
double[] result = pool.invoke(new RefineTask(100));
}
private class RefineTask extends RecursiveTask<double[]> {
final double cellId;
private RefineTask(double cellId) {
this.cellId = cellId;
}
@Override
protected double[] compute() {
return ForkJoinTask.invokeAll(createSubtasks())
.stream()
.map(ForkJoinTask::join)
.reduce(new double[0], new Concat());
}
}
private double[] refineCell(double cellId) {
double[] result;
if (checkCell()) {
result = new double[8];
for (int i = 0; i < 8; i++) {
result[i] = Math.random();
}
} else {
result = new double[1];
result[0] = cellId;
}
return result;
}
private Collection<RefineTask> createSubtasks() {
List<RefineTask> dividedTasks = new ArrayList<>();
for (int i = 0; i < cellIds.length; i++) {
dividedTasks.add(new RefineTask(cellIds[i]));
}
return dividedTasks;
}
private class Concat implements BinaryOperator<double[]> {
@Override
public double[] apply(double[] a, double[] b) {
int aLen = a.length;
int bLen = b.length;
@SuppressWarnings("unchecked")
double[] c = (double[]) Array.newInstance(a.getClass().getComponentType(), aLen + bLen);
System.arraycopy(a, 0, c, 0, aLen);
System.arraycopy(b, 0, c, aLen, bLen);
return c;
}
}
public boolean checkCell() {
return Math.random() < 0.5;
}
}
...我被困在这里了。
目前这没什么用,因为我从来没有调用过 refineCell
函数。
我创建的所有 double[]
也可能存在性能问题。以这种方式合并它们可能也不是最有效的方式。
但首先,在这种情况下,任何人都可以帮助我实现 fork join 吗?
该算法的预期结果是叶单元 ID 数组 (double[]
)
编辑 1:
多亏了评论,我想出了一些更好用的东西。
一些变化:
- 我从数组转到列表。这对内存占用不利,因为我无法使用 Java 原语。但它使植入更简单。
- 小区 ID 现在是 Long 而不是 Double。
- 不再随机选择 ID:
- 根级单元格的 ID 为 1、2、3 等;
- Children of 1 的 ID 为 10、11、12 等;
- Children of 2 的 ID 为 20、21、22 等;
- 你明白了...
- 我优化了所有ID小于100的单元格
为了这个例子,这让我可以更好地检查结果。
这是新的实现:
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.BinaryOperator;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
public class ForkJoinAttempt {
private static final int THRESHOLD = 2;
private List<Long> leafCellIds;
public void refineGrid(List<Long> cellsToProcess) {
leafCellIds = ForkJoinPool.commonPool().invoke(new RefineTask(cellsToProcess));
}
public List<Long> getLeafCellIds() {
return leafCellIds;
}
private class RefineTask extends RecursiveTask<List<Long>> {
private final CopyOnWriteArrayList<Long> cellsToProcess = new CopyOnWriteArrayList<>();
private RefineTask(List<Long> cellsToProcess) {
this.cellsToProcess.addAll(cellsToProcess);
}
@Override
protected List<Long> compute() {
if (cellsToProcess.size() > THRESHOLD) {
System.out.println("Fork/Join");
return ForkJoinTask.invokeAll(createSubTasks())
.stream()
.map(ForkJoinTask::join)
.reduce(new ArrayList<>(), new Concat());
} else {
System.out.println("Direct computation");
List<Long> leafCells = new ArrayList<>();
for (Long cell : cellsToProcess) {
Long result = refineCell(cell);
if (result != null) {
leafCells.add(result);
}
}
return leafCells;
}
}
private Collection<RefineTask> createSubTasks() {
List<RefineTask> dividedTasks = new ArrayList<>();
for (List<Long> list : split(cellsToProcess)) {
dividedTasks.add(new RefineTask(list));
}
return dividedTasks;
}
private Long refineCell(Long cellId) {
if (checkCell(cellId)) {
for (int i = 0; i < 8; i++) {
Long newCell = cellId * 10 + i;
cellsToProcess.add(newCell);
System.out.println("Adding child " + newCell + " to cell " + cellId);
}
return null;
} else {
System.out.println("Leaf node " + cellId);
return cellId;
}
}
private List<List<Long>> split(List<Long> list)
{
int[] index = {0, (list.size() + 1)/2, list.size()};
List<List<Long>> lists = IntStream.rangeClosed(0, 1)
.mapToObj(i -> list.subList(index[i], index[i + 1]))
.collect(Collectors.toList());
return lists;
}
}
private class Concat implements BinaryOperator<List<Long>> {
@Override
public List<Long> apply(List<Long> listOne, List<Long> listTwo) {
return Stream.concat(listOne.stream(), listTwo.stream())
.collect(Collectors.toList());
}
}
public boolean checkCell(Long cellId) {
return cellId < 100;
}
}
以及测试它的方法:
int initialSize = 4;
List<Long> cellIds = new ArrayList<>(initialSize);
for (int i = 0; i < initialSize; i++) {
cellIds.add(Long.valueOf(i + 1));
}
ForkJoinAttempt test = new ForkJoinAttempt();
test.refineGrid(cellIds);
List<Long> leafCellIds = test.getLeafCellIds();
System.out.println("Leaf nodes: " + leafCellIds.size());
for (Long node : leafCellIds) {
System.out.println(node);
}
输出确认它向每个根单元格添加了 8 children。但它并没有更进一步。
我知道为什么,但我不知道如何解决:这是因为尽管 refineCell 方法将新单元格添加到要处理的单元格列表中。 createSubTask 方法没有被再次调用,所以它不知道我添加了新的单元格。
编辑 2:
为了换句话说,我正在寻找一种机制,其中 Queue
单元格 ID 由一些 RecursiveTask
处理,而其他单元格 ID 添加到 Queue
并行
首先,让我们从基于流的解决方案开始
public class Mesh {
public static long[] refineGrid(long[] cellsToProcess) {
return Arrays.stream(cellsToProcess).parallel().flatMap(Mesh::expand).toArray();
}
static LongStream expand(long d) {
return checkCell(d)? LongStream.of(d): generate(d).flatMap(Mesh::expand);
}
private static boolean checkCell(long cellId) {
return cellId > 100;
}
private static LongStream generate(long cellId) {
return LongStream.range(0, 8).map(j -> cellId * 10 + j);
}
}
虽然当前的 flatMap
实施具有
如果您确实需要自定义实现,例如如果工作负载不均衡,Stream实现适配不够好,可以这样:
public class MeshTask extends RecursiveTask<long[]> {
public static long[] refineGrid(long[] cellsToProcess) {
return new MeshTask(cellsToProcess, 0, cellsToProcess.length).compute();
}
private final long[] source;
private final int from, to;
private MeshTask(long[] src, int from, int to) {
source = src;
this.from = from;
this.to = to;
}
@Override
protected long[] compute() {
return compute(source, from, to);
}
private static long[] compute(long[] source, int from, int to) {
long[] result = new long[to - from];
ArrayDeque<MeshTask> next = new ArrayDeque<>();
while(getSurplusQueuedTaskCount()<3) {
int mid = (from+to)>>>1;
if(mid == from) break;
MeshTask task = new MeshTask(source, mid, to);
next.push(task);
task.fork();
to = mid;
}
int pos = 0;
for(; from < to; ) {
long value = source[from++];
if(checkCell(value)) result[pos++]=value;
else {
long[] array = generate(value);
array = compute(array, 0, array.length);
result = Arrays.copyOf(result, result.length+array.length-1);
System.arraycopy(array, 0, result, pos, array.length);
pos += array.length;
}
while(from == to && !next.isEmpty()) {
MeshTask task = next.pop();
if(task.tryUnfork()) {
to = task.to;
}
else {
long[] array = task.join();
int newLen = pos+to-from+array.length;
if(newLen != result.length)
result = Arrays.copyOf(result, newLen);
System.arraycopy(array, 0, result, pos, array.length);
pos += array.length;
}
}
}
return result;
}
static boolean checkCell(long cellId) {
return cellId > 1000;
}
static long[] generate(long cellId) {
long[] sub = new long[8];
for(int i = 0; i < sub.length; i++) sub[i] = cellId*10+i;
return sub;
}
}
此实现直接调用根任务的 compute
方法,将调用线程合并到计算中。 compute
方法使用getSurplusQueuedTaskCount()
来决定是否拆分。正如它的文档所说,这个想法是总是有少量盈余,例如。 3
。这确保了评估可以适应不平衡的工作负载,因为空闲线程可以从其他任务中窃取工作。
拆分不是通过创建两个子任务并等待两者来完成的。相反,只有一个任务被拆分,代表待处理工作的后半部分,当前任务的工作量会调整以反映前半部分。
然后,剩余的工作量在本地处理。之后,弹出最后推送的子任务并尝试 unfork。如果 unforking 成功,则当前工作负载的范围也被调整以覆盖后续任务的范围并且局部迭代继续。
这样,任何没有被另一个线程窃取的多余任务都以最简单、最轻量级的方式处理,就好像它从未被分叉过一样。
如果任务已经被另一个线程拾取,我们现在必须等待它完成并合并结果数组。
请注意,当通过 join()
等待子任务时,底层实现还将检查是否可以取消分叉和本地评估,以保持所有工作线程忙碌。然而,调整我们的循环变量并直接将结果累加到我们的目标数组中仍然比仍然需要合并结果数组的嵌套 compute
调用更好。
如果单元格不是叶子,则生成的节点将由相同的逻辑递归处理。这再次允许自适应本地和并发评估,因此执行将适应不平衡的工作负载,例如如果特定单元格具有更大的子树或特定单元格的评估任务比其他单元格长得多。
必须强调的是,在所有情况下,都需要大量的处理工作量才能从并行处理中获益。如果像示例中那样,主要只是数据复制,那么好处可能会小得多,甚至不存在,或者在最坏的情况下,并行处理的性能可能比顺序处理差。