如何实现线程安全的收集器?

How to implement a thread-safe Collector?

我想要类似于 Collectors.maxBy() 的东西,一个获取集合中顶部元素的收集器(maxBy 只获取一个)。

我有一个 Possibility 对象流,可以使用 Integer score(Possibility) 方法对其进行评分。

首先我尝试了:

List<Possibity> possibilities = getPossibilityStream()
    .parallel()
    .collect(Collectors.toList());

if(!possibilities.isEmpty()) {
    int bestScore = possibilities.stream()
        .mapToInt(p -> score(p))
        .max()
        .getAsInt();
    possibilities = possibilities.stream()
        .filter(p -> score(p)==bestScore)
        .collect(Collectors.toList());
}

但是这样做,我扫描了该集合 3 次。一次构建它,第二次获得最高分,第三次过滤它,这不是最优的。此外,可能性的数量可能很大(>1012)。

最好的方法应该是在第一次收集中直接获取最高可能性,但似乎没有内置的收集器可以做这样的事情。

所以我实现了自己的 Collector:

public class BestCollector<E> implements Collector<E, List<E>, List<E>> {

    private final Comparator<E> comparator;

    private final Class<? extends List> listImpl ;

    public BestCollector(Comparator<E> comparator, Class<? extends List> listImpl) {
        this.comparator = comparator;
        this.listImpl = listImpl;
    }

    public BestCollector(Comparator<E> comparator) {
        this.comparator= comparator;
        listImpl = ArrayList.class;
    }

    @Override
    public Supplier<List<E>> supplier() {
        return () -> {
            try {
                return listImpl.newInstance();
            } catch (InstantiationException | IllegalAccessException ex) {
                throw new RuntimeException(ex);
            }
        };
    }

    @Override
    public BiConsumer<List<E>, E> accumulator() {
        return (list, e) -> {
            if (list.isEmpty()) {
                list.add(e);
            } else {
                final int comparison = comparator.compare(list.get(0), e);
                if (comparison == 0) {
                    list.add(e);
                } else if (comparison < 0) {
                    list.clear();
                    list.add(e);
                }
            }
        };
    }

    @Override
    public BinaryOperator<List<E>> combiner() {
        return (l1, l2) -> {
            final int comparison = comparator.compare(l1.get(0), l2.get(0));
            if (comparison == 0) {
                l1.addAll(l2);
                return l1;
            } else if (comparison < 0) {
                return l2;
            } else {
                return l1;
            }
        };
    }

    @Override
    public Function<List<E>, List<E>> finisher() {
        return Function.identity();
    }

    @Override
    public Set<Characteristics> characteristics() {
        return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.CONCURRENT, Characteristics.UNORDERED);
    }
}

然后:

List<Possibity> possibilities = getPossibilityStream()
    .parallel()
    .collect(new BestCollector<Possibility>((p1, p2) -> score(p1).compareTo(score(p2)));

这在顺序模式下完成工作(没有 .parallel()),但在并行模式下偶尔会在两个地方出现一些异常:

accumulator() 方法

我知道在 list.isEmpty()list.get(0) 之间调用 list.clear() 时会发生这种情况。

我不明白 list.get(0) 怎么会 return null...

在并行模式下,有时 list.get(0) 引发 IndexOutOfBoundsException 有时 return null.

我知道我的代码不是线程安全的,所以我尝试了几种解决方案:

但是我不知道这里的Characteristics.CONCURRENT是否表示我的代码是线程安全的,或者我的代码将用于并发处理。

但是 none 这些解决方案实际上解决了问题。


事实上,当我从特性中删除 CONCURRENT 时,有时会有 java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 但在行中:

final int comparison = comparator.compare(l1.get(0), l2.get(0));

combiner() 方法。

但是,accumulator() 方法引发的异常似乎不再发生。


@Holger 的回答是正确的。

完整的解决方案是同时更改 combiner()characteristics() 方法:

@Override
public BinaryOperator<List<E>> combiner() {
    return (l1, l2) -> {
        if (l1.isEmpty()) {
            return l2;
        } else if (l2.isEmpty()) {
            return l1;
        } else {
            final int comparison = comparator.compare(l1.get(0), l2.get(0));
            if (comparison == 0) {
                l1.addAll(l2);
                return l1;
            } else if (comparison < 0) {
                return l2;
            } else {
                return l1;
            }
        }
    };
}

@Override
public Set<Characteristics> characteristics() {
    return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED);
}

您的代码只有一个重大错误:如果您的收集器不是线程安全的,它不应该报告 Characteristics.CONCURRENT 因为这恰恰是在声称它是线程安全的。

您必须了解的重要一点是,对于非 CONCURRENT 收集器,框架将执行必要的步骤以线程安全但仍然有效的方式使用它:

  • 对于每个工作线程,将通过 supplier()
  • 获取一个新容器
  • 每个 worker 将使用 accumulator() 函数及其自己的本地容器
  • 两个工作线程完成工作后将使用combiner()
  • finisher() 将在 所有 工作线程完成其工作并且所有容器已合并时使用

因此,您所要做的就是确保您的供应商在每次调用时确实 returns 一个新实例,并且所有功能都是无干扰且无副作用的(除了容器之外的任何其他功能)作为参数接收),当然,当您的收集器不是并发收集器时,不报告 Characteristics.CONCURRENT

此处不需要 synchronized 关键字或并发集合。


顺便说一下,(p1, p2) -> score(p1).compareTo(score(p2)) 形式的 Comparator 可以使用 Comparator.comparing(p -> score(p)) 实现,或者如果分值是 intComparator.comparingInt(p -> score(p)) .


最后,您的组合器函数不会检查其中一个列表是否为空。这完美地解释了 combiner 中的 IndexOutOfBoundsExceptionaccumulator 中的 IndexOutOfBoundsException 是您的收集器报告的结果 Characteristics.CONCURRENT


了解向 accumulator()combiner() 方法添加 synchronized 关键字并不能保护通过 lambda 表达式构造的函数也很重要。它将保护 构造 函数实例的方法,而不是函数的代码本身。与内部 class 相比,无法在实际函数的实现方法中添加 synchronized 关键字。