RecursiveTask 结果到 ConcurrentMap
RecursiveTask Results to a ConcurrentMap
我正在尝试创建 RecursiveTask<Map<Short, Long>>
我将这篇文章用作 Reference
public class SearchTask2 extends RecursiveTask<Map<Short, Long>> {
private final int majorDataThreshold = 16000;
private ConcurrentNavigableMap<Short, Long> dataMap;
private long fromRange;
private long toRange;
private boolean fromInclusive;
private boolean toInclusive;
public SearchTask2(final Map<Short, Long> dataSource, final long fromRange, final long toRange,
final boolean fromInclusive, final boolean toInclusive) {
System.out.println("SearchTask :: ");
this.dataMap = new ConcurrentSkipListMap<>(dataSource);
this.fromRange = fromRange;
this.toRange = toRange;
this.fromInclusive = fromInclusive;
this.toInclusive = toInclusive;
}
@Override
protected Map<Short, Long> compute() {
System.out.println("SearchTask :: compute ");
//Map<Short, Long> result = new HashMap<>();
int size = dataMap.size();
if (size > majorDataThreshold + 2500) {
return ForkJoinTask.invokeAll(createSubtasks()).parallelStream().map(ForkJoinTask::join)
.collect(Collectors.toConcurrentMap(keyMapper, valueMapper));
//.forEach(entry -> result.put( entry.getKey(), (Long) entry.getValue()));
}
return search();
}
private List<SearchTask2> createSubtasks() {
final short lastKey = dataMap.lastKey();
final short midkey = (short) (lastKey / 2);
final short firstKey = dataMap.firstKey();
final List<SearchTask2> dividedTasks = new ArrayList<>();
dividedTasks.add(new SearchTask2(new HashMap<>(dataMap.subMap(firstKey, true, midkey, false)), fromRange,
toRange, fromInclusive, toInclusive));
dividedTasks.add(new SearchTask2(new HashMap<>(dataMap.subMap(midkey, true, lastKey, true)), fromRange, toRange,
fromInclusive, toInclusive));
return dividedTasks;
}
private HashMap<Short,Long> search(){
//My Search logic for values
return new HashMap<>();
}
}
有人可以帮我 keyMapper
和 'valueMapper' 我的结果地图,
我试过了Collectors.toConcurrentMap(entry -> entry.getKey(), entry -> entry.getValue())
但它向我显示错误
Cannot infer type argument(s) for <R, A> collect(Collector<? super T,A,R>)
您的 ForkJoinTask::join
正在返回一张地图,因此您有一系列地图。您似乎在期待条目流。您可以使用 flatMap
从映射流获取条目流,如下所示:
return ForkJoinTask.invokeAll(createSubtasks())
.parallelStream()
.map(ForkJoinTask::join)
.flatMap(map -> map.entrySet().stream()) // you were missing this line
.collect(
Collectors.toConcurrentMap(entry -> entry.getKey(), entry -> entry.getValue())
);
作为一个小改进,您还可以使用方法引用而不是您尝试的 lambda 表达式:
Collectors.toConcurrentMap(Entry::getKey, Entry::getValue)
我正在尝试创建 RecursiveTask<Map<Short, Long>>
我将这篇文章用作 Reference
public class SearchTask2 extends RecursiveTask<Map<Short, Long>> {
private final int majorDataThreshold = 16000;
private ConcurrentNavigableMap<Short, Long> dataMap;
private long fromRange;
private long toRange;
private boolean fromInclusive;
private boolean toInclusive;
public SearchTask2(final Map<Short, Long> dataSource, final long fromRange, final long toRange,
final boolean fromInclusive, final boolean toInclusive) {
System.out.println("SearchTask :: ");
this.dataMap = new ConcurrentSkipListMap<>(dataSource);
this.fromRange = fromRange;
this.toRange = toRange;
this.fromInclusive = fromInclusive;
this.toInclusive = toInclusive;
}
@Override
protected Map<Short, Long> compute() {
System.out.println("SearchTask :: compute ");
//Map<Short, Long> result = new HashMap<>();
int size = dataMap.size();
if (size > majorDataThreshold + 2500) {
return ForkJoinTask.invokeAll(createSubtasks()).parallelStream().map(ForkJoinTask::join)
.collect(Collectors.toConcurrentMap(keyMapper, valueMapper));
//.forEach(entry -> result.put( entry.getKey(), (Long) entry.getValue()));
}
return search();
}
private List<SearchTask2> createSubtasks() {
final short lastKey = dataMap.lastKey();
final short midkey = (short) (lastKey / 2);
final short firstKey = dataMap.firstKey();
final List<SearchTask2> dividedTasks = new ArrayList<>();
dividedTasks.add(new SearchTask2(new HashMap<>(dataMap.subMap(firstKey, true, midkey, false)), fromRange,
toRange, fromInclusive, toInclusive));
dividedTasks.add(new SearchTask2(new HashMap<>(dataMap.subMap(midkey, true, lastKey, true)), fromRange, toRange,
fromInclusive, toInclusive));
return dividedTasks;
}
private HashMap<Short,Long> search(){
//My Search logic for values
return new HashMap<>();
}
}
有人可以帮我 keyMapper
和 'valueMapper' 我的结果地图,
我试过了Collectors.toConcurrentMap(entry -> entry.getKey(), entry -> entry.getValue())
但它向我显示错误
Cannot infer type argument(s) for <R, A> collect(Collector<? super T,A,R>)
您的 ForkJoinTask::join
正在返回一张地图,因此您有一系列地图。您似乎在期待条目流。您可以使用 flatMap
从映射流获取条目流,如下所示:
return ForkJoinTask.invokeAll(createSubtasks())
.parallelStream()
.map(ForkJoinTask::join)
.flatMap(map -> map.entrySet().stream()) // you were missing this line
.collect(
Collectors.toConcurrentMap(entry -> entry.getKey(), entry -> entry.getValue())
);
作为一个小改进,您还可以使用方法引用而不是您尝试的 lambda 表达式:
Collectors.toConcurrentMap(Entry::getKey, Entry::getValue)