如何实现线程安全的收集器?
How to implement a thread-safe Collector?
我想要类似于 Collectors.maxBy()
的东西,一个获取集合中顶部元素的收集器(maxBy
只获取一个)。
我有一个 Possibility
对象流,可以使用 Integer score(Possibility)
方法对其进行评分。
首先我尝试了:
List<Possibity> possibilities = getPossibilityStream()
.parallel()
.collect(Collectors.toList());
if(!possibilities.isEmpty()) {
int bestScore = possibilities.stream()
.mapToInt(p -> score(p))
.max()
.getAsInt();
possibilities = possibilities.stream()
.filter(p -> score(p)==bestScore)
.collect(Collectors.toList());
}
但是这样做,我扫描了该集合 3 次。一次构建它,第二次获得最高分,第三次过滤它,这不是最优的。此外,可能性的数量可能很大(>1012)。
最好的方法应该是在第一次收集中直接获取最高可能性,但似乎没有内置的收集器可以做这样的事情。
所以我实现了自己的 Collector
:
public class BestCollector<E> implements Collector<E, List<E>, List<E>> {
private final Comparator<E> comparator;
private final Class<? extends List> listImpl ;
public BestCollector(Comparator<E> comparator, Class<? extends List> listImpl) {
this.comparator = comparator;
this.listImpl = listImpl;
}
public BestCollector(Comparator<E> comparator) {
this.comparator= comparator;
listImpl = ArrayList.class;
}
@Override
public Supplier<List<E>> supplier() {
return () -> {
try {
return listImpl.newInstance();
} catch (InstantiationException | IllegalAccessException ex) {
throw new RuntimeException(ex);
}
};
}
@Override
public BiConsumer<List<E>, E> accumulator() {
return (list, e) -> {
if (list.isEmpty()) {
list.add(e);
} else {
final int comparison = comparator.compare(list.get(0), e);
if (comparison == 0) {
list.add(e);
} else if (comparison < 0) {
list.clear();
list.add(e);
}
}
};
}
@Override
public BinaryOperator<List<E>> combiner() {
return (l1, l2) -> {
final int comparison = comparator.compare(l1.get(0), l2.get(0));
if (comparison == 0) {
l1.addAll(l2);
return l1;
} else if (comparison < 0) {
return l2;
} else {
return l1;
}
};
}
@Override
public Function<List<E>, List<E>> finisher() {
return Function.identity();
}
@Override
public Set<Characteristics> characteristics() {
return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.CONCURRENT, Characteristics.UNORDERED);
}
}
然后:
List<Possibity> possibilities = getPossibilityStream()
.parallel()
.collect(new BestCollector<Possibility>((p1, p2) -> score(p1).compareTo(score(p2)));
这在顺序模式下完成工作(没有 .parallel()
),但在并行模式下偶尔会在两个地方出现一些异常:
行中有一个java.lang.IndexOutOfBoundsException Index: 0, Size: 0
:
final int comparison = comparator.compare(list.get(0), e);
accumulator()
方法
我知道在 list.isEmpty()
和 list.get(0)
之间调用 list.clear()
时会发生这种情况。
score(Possibility)方法中的一个java.lang.NullPointerException
因为可能性是null
。再次涉及同一行:
final int comparison = comparator.compare(list.get(0), e);
我不明白 list.get(0)
怎么会 return null
...
在并行模式下,有时 list.get(0)
引发 IndexOutOfBoundsException
有时 return null
.
我知道我的代码不是线程安全的,所以我尝试了几种解决方案:
- 在BestCollector的所有方法中添加
synchronized
:public synchronized …
- 使用线程安全集合而不是
ArrayList
:java.util.concurrent.CopyOnWriteArrayList
- 添加
synchronized
并同时使用CopyOnWriteArrayList
从characteristics()
方法Set<Characteristics>
中删除Characteristics.CONCURRENT
@Override
public Set<Characteristics> characteristics() {
return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED);
}
但是我不知道这里的Characteristics.CONCURRENT
是否表示我的代码是线程安全的,或者我的代码将用于并发处理。
但是 none 这些解决方案实际上解决了问题。
事实上,当我从特性中删除 CONCURRENT 时,有时会有 java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
但在行中:
final int comparison = comparator.compare(l1.get(0), l2.get(0));
combiner()
方法。
但是,accumulator()
方法引发的异常似乎不再发生。
@Holger 的回答是正确的。
完整的解决方案是同时更改 combiner()
和 characteristics()
方法:
@Override
public BinaryOperator<List<E>> combiner() {
return (l1, l2) -> {
if (l1.isEmpty()) {
return l2;
} else if (l2.isEmpty()) {
return l1;
} else {
final int comparison = comparator.compare(l1.get(0), l2.get(0));
if (comparison == 0) {
l1.addAll(l2);
return l1;
} else if (comparison < 0) {
return l2;
} else {
return l1;
}
}
};
}
@Override
public Set<Characteristics> characteristics() {
return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED);
}
您的代码只有一个重大错误:如果您的收集器不是线程安全的,它不应该报告 Characteristics.CONCURRENT
因为这恰恰是在声称它是线程安全的。
您必须了解的重要一点是,对于非 CONCURRENT
收集器,框架将执行必要的步骤以线程安全但仍然有效的方式使用它:
- 对于每个工作线程,将通过
supplier()
获取一个新容器
- 每个 worker 将使用
accumulator()
函数及其自己的本地容器
- 两个工作线程完成工作后将使用
combiner()
finisher()
将在 所有 工作线程完成其工作并且所有容器已合并时使用
因此,您所要做的就是确保您的供应商在每次调用时确实 returns 一个新实例,并且所有功能都是无干扰且无副作用的(除了容器之外的任何其他功能)作为参数接收),当然,当您的收集器不是并发收集器时,不报告 Characteristics.CONCURRENT
。
此处不需要 synchronized
关键字或并发集合。
顺便说一下,(p1, p2) -> score(p1).compareTo(score(p2))
形式的 Comparator
可以使用 Comparator.comparing(p -> score(p))
实现,或者如果分值是 int
:Comparator.comparingInt(p -> score(p))
.
最后,您的组合器函数不会检查其中一个列表是否为空。这完美地解释了 combiner
中的 IndexOutOfBoundsException
而 accumulator
中的 IndexOutOfBoundsException
是您的收集器报告的结果 Characteristics.CONCURRENT
…
了解向 accumulator()
或 combiner()
方法添加 synchronized
关键字并不能保护通过 lambda 表达式构造的函数也很重要。它将保护 构造 函数实例的方法,而不是函数的代码本身。与内部 class 相比,无法在实际函数的实现方法中添加 synchronized
关键字。
我想要类似于 Collectors.maxBy()
的东西,一个获取集合中顶部元素的收集器(maxBy
只获取一个)。
我有一个 Possibility
对象流,可以使用 Integer score(Possibility)
方法对其进行评分。
首先我尝试了:
List<Possibity> possibilities = getPossibilityStream()
.parallel()
.collect(Collectors.toList());
if(!possibilities.isEmpty()) {
int bestScore = possibilities.stream()
.mapToInt(p -> score(p))
.max()
.getAsInt();
possibilities = possibilities.stream()
.filter(p -> score(p)==bestScore)
.collect(Collectors.toList());
}
但是这样做,我扫描了该集合 3 次。一次构建它,第二次获得最高分,第三次过滤它,这不是最优的。此外,可能性的数量可能很大(>1012)。
最好的方法应该是在第一次收集中直接获取最高可能性,但似乎没有内置的收集器可以做这样的事情。
所以我实现了自己的 Collector
:
public class BestCollector<E> implements Collector<E, List<E>, List<E>> {
private final Comparator<E> comparator;
private final Class<? extends List> listImpl ;
public BestCollector(Comparator<E> comparator, Class<? extends List> listImpl) {
this.comparator = comparator;
this.listImpl = listImpl;
}
public BestCollector(Comparator<E> comparator) {
this.comparator= comparator;
listImpl = ArrayList.class;
}
@Override
public Supplier<List<E>> supplier() {
return () -> {
try {
return listImpl.newInstance();
} catch (InstantiationException | IllegalAccessException ex) {
throw new RuntimeException(ex);
}
};
}
@Override
public BiConsumer<List<E>, E> accumulator() {
return (list, e) -> {
if (list.isEmpty()) {
list.add(e);
} else {
final int comparison = comparator.compare(list.get(0), e);
if (comparison == 0) {
list.add(e);
} else if (comparison < 0) {
list.clear();
list.add(e);
}
}
};
}
@Override
public BinaryOperator<List<E>> combiner() {
return (l1, l2) -> {
final int comparison = comparator.compare(l1.get(0), l2.get(0));
if (comparison == 0) {
l1.addAll(l2);
return l1;
} else if (comparison < 0) {
return l2;
} else {
return l1;
}
};
}
@Override
public Function<List<E>, List<E>> finisher() {
return Function.identity();
}
@Override
public Set<Characteristics> characteristics() {
return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.CONCURRENT, Characteristics.UNORDERED);
}
}
然后:
List<Possibity> possibilities = getPossibilityStream()
.parallel()
.collect(new BestCollector<Possibility>((p1, p2) -> score(p1).compareTo(score(p2)));
这在顺序模式下完成工作(没有 .parallel()
),但在并行模式下偶尔会在两个地方出现一些异常:
行中有一个
java.lang.IndexOutOfBoundsException Index: 0, Size: 0
:final int comparison = comparator.compare(list.get(0), e);
accumulator()
方法
我知道在 list.isEmpty()
和 list.get(0)
之间调用 list.clear()
时会发生这种情况。
score(Possibility)方法中的一个
java.lang.NullPointerException
因为可能性是null
。再次涉及同一行:final int comparison = comparator.compare(list.get(0), e);
我不明白 list.get(0)
怎么会 return null
...
在并行模式下,有时 list.get(0)
引发 IndexOutOfBoundsException
有时 return null
.
我知道我的代码不是线程安全的,所以我尝试了几种解决方案:
- 在BestCollector的所有方法中添加
synchronized
:public synchronized …
- 使用线程安全集合而不是
ArrayList
:java.util.concurrent.CopyOnWriteArrayList
- 添加
synchronized
并同时使用CopyOnWriteArrayList
从
characteristics()
方法Set<Characteristics>
中删除Characteristics.CONCURRENT
@Override public Set<Characteristics> characteristics() { return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED); }
但是我不知道这里的Characteristics.CONCURRENT
是否表示我的代码是线程安全的,或者我的代码将用于并发处理。
但是 none 这些解决方案实际上解决了问题。
事实上,当我从特性中删除 CONCURRENT 时,有时会有 java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
但在行中:
final int comparison = comparator.compare(l1.get(0), l2.get(0));
combiner()
方法。
但是,accumulator()
方法引发的异常似乎不再发生。
@Holger 的回答是正确的。
完整的解决方案是同时更改 combiner()
和 characteristics()
方法:
@Override
public BinaryOperator<List<E>> combiner() {
return (l1, l2) -> {
if (l1.isEmpty()) {
return l2;
} else if (l2.isEmpty()) {
return l1;
} else {
final int comparison = comparator.compare(l1.get(0), l2.get(0));
if (comparison == 0) {
l1.addAll(l2);
return l1;
} else if (comparison < 0) {
return l2;
} else {
return l1;
}
}
};
}
@Override
public Set<Characteristics> characteristics() {
return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED);
}
您的代码只有一个重大错误:如果您的收集器不是线程安全的,它不应该报告 Characteristics.CONCURRENT
因为这恰恰是在声称它是线程安全的。
您必须了解的重要一点是,对于非 CONCURRENT
收集器,框架将执行必要的步骤以线程安全但仍然有效的方式使用它:
- 对于每个工作线程,将通过
supplier()
获取一个新容器
- 每个 worker 将使用
accumulator()
函数及其自己的本地容器 - 两个工作线程完成工作后将使用
combiner()
finisher()
将在 所有 工作线程完成其工作并且所有容器已合并时使用
因此,您所要做的就是确保您的供应商在每次调用时确实 returns 一个新实例,并且所有功能都是无干扰且无副作用的(除了容器之外的任何其他功能)作为参数接收),当然,当您的收集器不是并发收集器时,不报告 Characteristics.CONCURRENT
。
此处不需要 synchronized
关键字或并发集合。
顺便说一下,(p1, p2) -> score(p1).compareTo(score(p2))
形式的 Comparator
可以使用 Comparator.comparing(p -> score(p))
实现,或者如果分值是 int
:Comparator.comparingInt(p -> score(p))
.
最后,您的组合器函数不会检查其中一个列表是否为空。这完美地解释了 combiner
中的 IndexOutOfBoundsException
而 accumulator
中的 IndexOutOfBoundsException
是您的收集器报告的结果 Characteristics.CONCURRENT
…
了解向 accumulator()
或 combiner()
方法添加 synchronized
关键字并不能保护通过 lambda 表达式构造的函数也很重要。它将保护 构造 函数实例的方法,而不是函数的代码本身。与内部 class 相比,无法在实际函数的实现方法中添加 synchronized
关键字。