并行迭代大哈希图

Iterate big hashmap in parallel

我有一个链接的哈希图,最多可包含多达 300k 条记录。我想并行迭代此地图以提高性能。该函数遍历向量映射并找到给定向量与映射中所有向量的点积。还要根据日期值再进行一次检查。函数 returns 是一个嵌套的 hashmap。 T

这是使用迭代器的代码:

public HashMap<String,HashMap<String,Double>> function1(String key, int days) {
    LocalDate date = LocalDate.now().minusDays(days);
    HashMap<String,Double> ret = new HashMap<>();
    HashMap<String,Double> ret2 = new HashMap<>();
    OpenMapRealVector v0 = map.get(key).value;
    for(Map.Entry<String, FixedTimeHashMap<OpenMapRealVector>> e: map.entrySet()) {
        if(!e.getKey().equals(key)) {
            Double d = v0.dotProduct(e.getValue().value);
            d = Double.parseDouble(new DecimalFormat("###.##").format(d));
            ret.put(e.getKey(),d);
            if(e.getValue().date.isAfter(date)){
                ret2.put(e.getKey(),d);
            }
        }
    }
    HashMap<String,HashMap<String,Double>> result = new HashMap<>();
    result.put("dot",ret);
    result.put("anomaly",ret2);
    return result;
}

更新: 我调查了 Java 8 个流,但我 运行 在使用并行流时正在研究 CastException 和 Null 指针异常,因为这个映射正在其他地方修改。

代码:

public HashMap<String,HashMap<String,Double>> function1(String key, int days) {
    LocalDate date = LocalDate.now().minusDays(days);
    HashMap<String,Double> ret = new HashMap<>();
    HashMap<String,Double> ret2 = new HashMap<>();
    OpenMapRealVector v0 = map.get(key).value;
    synchronized (map) {
        map.entrySet().parallelStream().forEach(e -> {
            if(!e.getKey().equals(key)) {
                Double d = v0.dotProduct(e.getValue().value);
                d = Double.parseDouble(new DecimalFormat("###.##").format(d));
                ret.put(e.getKey(),d);
                if(e.getValue().date.isAfter(date)) {
                    ret2.put(e.getKey(),d);
                }
            }
        });
    }
}

我已经同步了地图的使用,但是还是报错如下:

java.util.concurrent.ExecutionException: java.lang.ClassCastException
Caused by: java.lang.ClassCastException
Caused by: java.lang.ClassCastException: java.util.HashMap$Node cannot be cast to java.util.HashMap$TreeNode

此外,我在想我是否应该将地图拆分成多个部分并 运行 每个部分并行使用不同的线程?

您需要从地图中检索 Set<Map.Entry<K, V>>

以下是如何在 Java8 中使用并行流在地图上进行迭代:

Map<String, String> myMap = new HashMap<> ();
myMap.entrySet ()
    .parallelStream ()
    .forEach (entry -> {
        String key = entry.getKey ();
        String value = entry.getValue ();
        // here add whatever processing you wanna do using the key / value retrieved
        // ret.put (....);
        // ret2.put (....)
    });

澄清:

映射 retret2 应声明为 ConcurrentHashMaps 以允许来自多个线程的并发插入/更新。

因此 2 个映射的声明变为:

Map<String,Double> ret = new ConcurrentHashMap<> ();
Map<String,Double> ret2 = new ConcurrentHashMap<> ();

使用 Java 8 的一个可能的解决方案是,

Map<String, Double> dotMap = map.entrySet().stream().filter(e -> !e.getKey().equals(key))
        .collect(Collectors.toMap(Map.Entry::getKey, e -> Double
                .parseDouble(new DecimalFormat("###.##").format(v0.dotProduct(e.getValue().value)))));
Map<String, Double> anomalyMap = map.entrySet().stream().filter(e -> !e.getKey().equals(key))
        .filter(e -> e.getValue().date.isAfter(date))
        .collect(Collectors.toMap(Map.Entry::getKey, e -> Double
                .parseDouble(new DecimalFormat("###.##").format(v0.dotProduct(e.getValue().value)))));
result.put("dot", dotMap);
result.put("anomaly", anomalyMap);

更新

这里有更优雅的解决方案,

Map<String, Map<String, Double>> resultMap = map.entrySet().stream().filter(e -> !e.getKey().equals(key))
        .collect(Collectors.groupingBy(e -> e.getValue().date.isAfter(date) ? "anomaly" : "dot",
                Collectors.toMap(Map.Entry::getKey, e -> Double.parseDouble(
                        new DecimalFormat("###.##").format(v0.dotProduct(e.getValue().value))))));

这里我们首先根据异常或点对它们进行分组,然后使用下游 Collector 为每个组创建一个 Map。我还根据以下建议更新了 .filter() 标准。