Java 8 个流组合器从未调用过

Java 8 stream combiner never called

我正在编写自定义 java 8 收集器,它应该计算具有 getValue() 方法的 POJO 的平均值。这是代码:

public static Collector<BoltAggregationData, BigDecimal[], BigDecimal> avgCollector = new Collector<BoltAggregationData, BigDecimal[], BigDecimal>() {

        @Override
        public Supplier<BigDecimal[]> supplier() {
            return () -> {
                BigDecimal[] start = new BigDecimal[2];
                start[0] = BigDecimal.ZERO;
                start[1] = BigDecimal.ZERO;
                return start;
            };
        }

        @Override
        public BiConsumer<BigDecimal[], BoltAggregationData> accumulator() {
            return (a,b) ->  {
                a[0] = a[0].add(b.getValue());
                a[1] = a[1].add(BigDecimal.ONE);
            };
        }

        @Override
        public BinaryOperator<BigDecimal[]> combiner() {
            return (a,b) -> {
                a[0] = a[0].add(b[0]);
                a[1] = a[1].add(b[1]);
                return a;
            };
        }

        @Override
        public Function<BigDecimal[], BigDecimal> finisher() {
            return (a) -> {
                return a[0].divide(a[1], 6 , RoundingMode.HALF_UP);
            };
        }

        private final Set<Characteristics> CHARACTERISTICS = new HashSet<Characteristics>(Arrays.asList(Characteristics.CONCURRENT, Characteristics.UNORDERED));

        @Override
        public Set<Characteristics> characteristics() {
            return CHARACTERISTICS;
        }

    };

在非并行情况下一切正常。但是,当我使用 parallelStream() 时,它有时不起作用。例如,给定从 1 到 10 的值,它计算(53/9 而不是 55/10)。调试时,调试器永远不会命中 combiner() 函数中的断点。我需要设置某种标志吗?

嗯,这正是您在指定 Characteristics.CONCURRENT 时所要求的:

Indicates that this collector is concurrent, meaning that the result container can support the accumulator function being called concurrently with the same result container from multiple threads.

如果不是这种情况,就像您的 Collector 一样,您不应该指定该标志。


附带说明一下,new HashSet<Characteristics>(Arrays.asList(Characteristics.CONCURRENT, Characteristics.UNORDERED)); 指定特征的效率非常低。您可以只使用 EnumSet.of(Characteristics.CONCURRENT, Characteristics.UNORDERED)。当您删除错误的 concurrent 特征时,您可以使用 EnumSet.of(Characteristics.UNORDERED)Collections.singleton(Characteristics.UNORDERED),但 HashSet 绝对是矫枉过正。

问题似乎出在 CONCURRENT 特性上,它的作用超出您的想象:

Indicates that this collector is concurrent, meaning that the result container can support the accumulator function being called concurrently with the same result container from multiple threads.

而不是 调用组合器,同时调用累加器,对所有线程使用相同的 BigDecimal[] a。对a的访问不是原子的,所以出错了:

Thread1 -> retrieves value of a[0]: 3
Thread2 -> retrieves value of a[0]: 3
Thread1 -> adds own value: 3 + 3 = 6
Thread2 -> adds own value: 3 + 4 = 7
Thread1 -> writes 6 to a[0]
Thread2 -> writes 7 to a[0]

a[0] 的值设置为 7,而它应该是 10。a[1] 也会发生同样的事情,因此结果可能会不一致。


如果删除 CONCURRENT 特性,将改为使用组合器。