Trying to use Map.values().parallelStream().forEach(list -> list.sort(comparator)) but get error: "Comparison method violates its general contract!"

Trying to use Map.values().parallelStream().forEach(list -> list.sort(comparator)) but get error: "Comparison method violates its general contract!"

我正在尝试使用多线程对存储在地图中的数组进行排序。有大量记录,约 310 万条,因此当我尝试在单线程 for 循环中对这些记录进行排序时,需要花费很多时间才能完成。我希望尽可能缩短这段时间,最好是在几分钟内(请不要笑!)。

堆栈跟踪:

    Exception in thread "main" java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: Comparison method violates its general contract!
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
    at java.base/java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:562)
    at java.base/java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:591)
    at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:689)
    at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
    at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
    at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:765)
    at com.salesforce.process.Process.startProcess(Process.java:51)
    at com.salesforce.process.Schedule.main(Schedule.java:10)
Caused by: java.lang.IllegalArgumentException: Comparison method violates its general contract!
    at java.base/java.util.TimSort.mergeLo(TimSort.java:781)
    at java.base/java.util.TimSort.mergeAt(TimSort.java:518)
    at java.base/java.util.TimSort.mergeCollapse(TimSort.java:448)
    at java.base/java.util.TimSort.sort(TimSort.java:245)
    at java.base/java.util.Arrays.sort(Arrays.java:1307)
    at java.base/java.util.ArrayList.sort(ArrayList.java:1721)
    at com.salesforce.process.Process.lambda$startProcess[=10=](Process.java:51)
    at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
    at java.base/java.util.HashMap$ValueSpliterator.forEachRemaining(HashMap.java:1779)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
    at java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
    at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:754)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)

Class 对象:

public class MyObject {
private Integer id;
public String someString;
public Double sortableValue;

... contructors & getters and setters ...
public static Comparator<MyObject> SortableValueComparator = new Comparator<MyObject>() {

    public int compare(MyObject ds1, MyObject ds2) {
       Double sortableValue1 = ds1.getSortableValue();
       Double sortableValue2 = ds2.getSortableValue();
       //descending order      
       if (Double.compare(sortableValue1, sortableValue2) == 0) {
            return 0;
        }
        else if (Double.compare(sortableValue1, sortableValue2) < 0) {
            return -1;
        }
        else {
            return 1;
        }
    }
};

代码:

我正尝试在这样的代码中执行此操作:

Map<String,List<MyObject>> map = new HashMap<String,List<MyObject>>();
// inject 3.1 million keys with List<MyObject> values, with 1-10 items in each list.

map.values().parallelStream().forEach(list -> list.sort(MyObject.SortableValueComparator));

注意:这不是我想要做的,但我最初编写的代码是这样的并且它有效。也就是说,如果我这样做,我的比较器就可以工作。

for (List<MyObject> list : map.values()) {
            Collections.sort(list, MyObject.SortableValueComparator);    
        }

然而,它需要 for.ev.er 才能完成,遗憾的是,这对于我们的业务案例来说是不可接受的。这个新手可以做些什么来使这个 parallelStream() 或某种线程化方法工作?如果您需要更多信息,请告诉我!非常感谢!!

编辑:我也想给你们一个下面的数据样本。 所以这是一个 Map<String,List<MyObject>>.

key (String): "key1", values (List<MyObject>): [{"a",0.0112},{"b",0.12},{"c",0.00512}]
key: "key2", values: [{"d",0.0922},{"a",0.0112},{"f",0.23}]
key: "key3", values: [{"z",0.141},{"w",0.432},{"x",0.0001}]

所以,如果我想对 key3 对象列表进行排序,它们会 return 像这样:

key: "key3", values: [{"w",0.432},{"z",0.141},,{"x",0.0001}]

而且,我想对每条记录执行此排序功能。

最好在抛出异常的地方放置一个断点并检查正在比较的值。然后编写一个单元测试,检查将这些值传递给比较器时会发生什么以及结果如何与相同两个对象上的 'equals' 进行比较。 非常 很可能您的比较器为不属于 'equal' 的对象返回 0 值 - 即 'equals' 在 MyObject 上的实现比较了除排序值。这会在合并集合时导致问题。

因此,设置一个断点,查看哪些值破坏了契约,并在一两次测试中捕获它。一旦你弄明白了,你可能必须添加一些额外的字段(如果你无法控制 'equals' 或者这是现有代码,你不能那样改变)到你的比较器使 'equals' 匹配。

而不是使用

Map.values().parallelStream().forEach(list -> list.sort(comparator))

我用过

Map.values().Stream().forEach(list -> list.sort(comparator))

成功了!