RecursiveTask 结果到 ConcurrentMap

RecursiveTask Results to a ConcurrentMap

我正在尝试创建 RecursiveTask<Map<Short, Long>>

我将这篇文章用作 Reference

public class SearchTask2 extends RecursiveTask<Map<Short, Long>> {

    private final int majorDataThreshold = 16000;

    private ConcurrentNavigableMap<Short, Long> dataMap;
    private long fromRange;
    private long toRange;
    private boolean fromInclusive;
    private boolean toInclusive;

    public SearchTask2(final Map<Short, Long> dataSource, final long fromRange, final long toRange,
            final boolean fromInclusive, final boolean toInclusive) {
        System.out.println("SearchTask ::  ");
        this.dataMap = new ConcurrentSkipListMap<>(dataSource);
        this.fromRange = fromRange;
        this.toRange = toRange;
        this.fromInclusive = fromInclusive;
        this.toInclusive = toInclusive;
    }

    @Override
    protected Map<Short, Long> compute() {
        System.out.println("SearchTask :: compute ");
        //Map<Short, Long> result = new HashMap<>();
        int size = dataMap.size();
        if (size > majorDataThreshold + 2500) {
             return ForkJoinTask.invokeAll(createSubtasks()).parallelStream().map(ForkJoinTask::join)
             .collect(Collectors.toConcurrentMap(keyMapper, valueMapper));

            //.forEach(entry -> result.put( entry.getKey(), (Long) entry.getValue()));  
        } 
        return  search();
    }

    private List<SearchTask2> createSubtasks() {
        final short lastKey = dataMap.lastKey();
        final short midkey = (short) (lastKey / 2);
        final short firstKey = dataMap.firstKey();
        final List<SearchTask2> dividedTasks = new ArrayList<>();
        dividedTasks.add(new SearchTask2(new HashMap<>(dataMap.subMap(firstKey, true, midkey, false)), fromRange,
                toRange, fromInclusive, toInclusive));
        dividedTasks.add(new SearchTask2(new HashMap<>(dataMap.subMap(midkey, true, lastKey, true)), fromRange, toRange,
                fromInclusive, toInclusive));
        return dividedTasks;
    }

    private HashMap<Short,Long> search(){
        //My Search logic for values
        return new HashMap<>();
    }
}

有人可以帮我 keyMapper 和 'valueMapper' 我的结果地图, 我试过了Collectors.toConcurrentMap(entry -> entry.getKey(), entry -> entry.getValue())

但它向我显示错误

Cannot infer type argument(s) for <R, A> collect(Collector<? super T,A,R>)

您的 ForkJoinTask::join 正在返回一张地图,因此您有一系列地图。您似乎在期待条目流。您可以使用 flatMap 从映射流获取条目流,如下所示:

return ForkJoinTask.invokeAll(createSubtasks())
    .parallelStream()
    .map(ForkJoinTask::join)
    .flatMap(map -> map.entrySet().stream())   // you were missing this line
    .collect(
        Collectors.toConcurrentMap(entry -> entry.getKey(), entry -> entry.getValue())
    );

作为一个小改进,您还可以使用方法引用而不是您尝试的 lambda 表达式:

Collectors.toConcurrentMap(Entry::getKey, Entry::getValue)