收集器的组合器功能是否可以用于顺序流?
Can a Collector's combiner function ever be used on sequential streams?
示例程序:
public final class CollectorTest
{
private CollectorTest()
{
}
private static <T> BinaryOperator<T> nope()
{
return (t, u) -> { throw new UnsupportedOperationException("nope"); };
}
public static void main(final String... args)
{
final Collector<Integer, ?, List<Integer>> c
= Collector.of(ArrayList::new, List::add, nope());
IntStream.range(0, 10_000_000).boxed().collect(c);
}
}
所以,为了简化这里的事情,没有最终的转换,所以生成的代码非常简单。
现在,IntStream.range()
生成顺序流。我只是将结果装入 Integer
中,然后我设计的 Collector
将它们收集到 List<Integer>
中。很简单。
而且无论我 运行 这个示例程序多少次,UnsupportedOperationException
永远不会命中,这意味着我的虚拟组合器永远不会被调用。
我有点预料到这一点,但后来我对流的误解已经够多了,我不得不问这个问题...
当流 保证 是连续的时,是否可以调用 Collector
的组合器?
正如@MarkoTopolnik 和@Duncan 在之前的评论中所观察到的那样,不能保证在顺序模式下调用 Collector.combiner()
会产生减少的结果。事实上,Java 文档在这一点上有点主观,可能导致不恰当的解释。
(...) A parallel implementation would partition the input, create a result container for each partition, accumulate the contents of each partition into a subresult for that partition, and then use the combiner function to merge the subresults into a combined result.
根据 NoBlogDefFound 组合器仅在并行模式下使用。部分引述如下:
combiner() is used to join two accumulators together into one. It is used when collector is executed in parallel, splitting input Stream and collecting parts independently first.
为了更清楚地说明这个问题,我重写了第一段代码并采用了两种方法(串行和并行)。
public final class CollectorTest
{
private CollectorTest()
{
}
private static <T> BinaryOperator<T> nope()
{
return (t, u) -> { throw new UnsupportedOperationException("nope"); };
}
public static void main(final String... args)
{
final Collector<Integer, ?, List<Integer>> c =
Collector
.of(ArrayList::new, List::add, nope());
// approach sequential
Stream<Integer> sequential = IntStream
.range(0, 10_000_000)
.boxed();
System.out.println("isParallel:" + sequential.isParallel());
sequential
.collect(c);
// approach parallel
Stream<Integer> parallel = IntStream
.range(0, 10_000_000)
.parallel()
.boxed();
System.out.println("isParallel:" + parallel.isParallel());
parallel
.collect(c);
}
}
在运行这段代码之后我们可以得到输出:
isParallel:false
isParallel:true
Exception in thread "main" java.lang.UnsupportedOperationException: nope
at com.Whosebug.lambda.CollectorTest.lambda$nope[=11=](CollectorTest.java:18)
at com.Whosebug.lambda.CollectorTest$$Lambda/2001049719.apply(Unknown Source)
at java.util.stream.ReduceOpsReducingSink.combine(ReduceOps.java:174)
at java.util.stream.ReduceOpsReducingSink.combine(ReduceOps.java:160)
所以,根据这个结果我们可以推断Collector's combiner
只能被并行执行调用。
仔细阅读 ReduceOps.java 中的流实现代码会发现,组合函数仅在 ReduceTask
完成时调用,并且 ReduceTask
实例仅在评估管道时使用在平行下。因此,在当前的实现中, 在评估顺序管道时永远不会调用组合器。
但是,规范中没有任何内容可以保证这一点。 Collector
是一个对其实现有要求的接口,并且没有为顺序流授予豁免。就个人而言,我很难想象为什么顺序流水线评估可能需要调用组合器,但比我更有想象力的人可能会找到它的巧妙用途,并实现它。规范允许,即使今天的实现没有做到,你还是要考虑一下。
这应该不足为奇。流API的设计中心是支持与顺序执行平等的并行执行。当然,程序可以观察到它是顺序执行还是并行执行。但是 API 的设计是为了支持一种允许任一种编程风格。
如果您正在编写收集器并且发现不可能(或不方便或困难)编写关联组合器函数,导致您想要将流限制为顺序执行,也许这意味着您朝着错误的方向前进。是时候退后一步,考虑以不同的方式解决问题了。
不需要关联组合器函数的常见 reduction-style 操作称为 fold-left。主要特点是严格应用折叠功能 left-to-right,一次进行一个。我不知道并行化 fold-left.
的方法
当人们试图以我们一直在谈论的方式扭曲收藏家时,他们通常会寻找类似 fold-left 的东西。 Streams API 没有对此操作的直接 API 支持,但它很容易编写。例如,假设您想使用以下操作减少字符串列表:重复第一个字符串,然后追加第二个。很容易证明这个操作不是关联的:
List<String> list = Arrays.asList("a", "b", "c", "d", "e");
System.out.println(list.stream()
.collect(StringBuilder::new,
(a, b) -> a.append(a.toString()).append(b),
(a, b) -> a.append(a.toString()).append(b))); // BROKEN -- NOT ASSOCIATIVE
运行 依次产生所需的输出:
aabaabcaabaabcdaabaabcaabaabcde
但是当运行并行时,它可能会产生这样的结果:
aabaabccdde
因为它是 "works" 顺序的,我们可以通过调用 sequential()
来强制执行它,并通过让组合器抛出异常来支持它。此外,供应商必须恰好被调用一次。没有办法合并中间结果,所以如果供应商被调用两次,我们就已经有麻烦了。但是由于我们 "know" 供应商在顺序模式下只被调用一次,所以大多数人并不担心这一点。事实上,我看到有人写 "suppliers" return 一些现有的 object 而不是创建一个新的,这违反了供应商合同。
在 collect()
的 3-arg 形式的使用中,三个函数中有两个违反了它们的约定。这不应该告诉我们以不同的方式做事吗?
这里的主要工作是由累加器函数完成的。要完成 fold-style 减少,我们可以使用 forEachOrdered()
以严格的 left-to-right 顺序应用此函数。我们必须在前后做一些设置和完成代码,但这没问题:
StringBuilder a = new StringBuilder();
list.parallelStream()
.forEachOrdered(b -> a.append(a.toString()).append(b));
System.out.println(a.toString());
自然地,这在并行时工作得很好,尽管 运行 并行的性能优势可能会被 forEachOrdered()
.
的排序要求抵消
总而言之,如果您发现自己想要进行可变缩减,但缺少关联组合器功能,导致您将流限制为顺序执行,请将问题重铸为 fold-left 运算并在你的累加器函数上使用 forEachRemaining()
。
示例程序:
public final class CollectorTest
{
private CollectorTest()
{
}
private static <T> BinaryOperator<T> nope()
{
return (t, u) -> { throw new UnsupportedOperationException("nope"); };
}
public static void main(final String... args)
{
final Collector<Integer, ?, List<Integer>> c
= Collector.of(ArrayList::new, List::add, nope());
IntStream.range(0, 10_000_000).boxed().collect(c);
}
}
所以,为了简化这里的事情,没有最终的转换,所以生成的代码非常简单。
现在,IntStream.range()
生成顺序流。我只是将结果装入 Integer
中,然后我设计的 Collector
将它们收集到 List<Integer>
中。很简单。
而且无论我 运行 这个示例程序多少次,UnsupportedOperationException
永远不会命中,这意味着我的虚拟组合器永远不会被调用。
我有点预料到这一点,但后来我对流的误解已经够多了,我不得不问这个问题...
当流 保证 是连续的时,是否可以调用 Collector
的组合器?
正如@MarkoTopolnik 和@Duncan 在之前的评论中所观察到的那样,不能保证在顺序模式下调用 Collector.combiner()
会产生减少的结果。事实上,Java 文档在这一点上有点主观,可能导致不恰当的解释。
(...) A parallel implementation would partition the input, create a result container for each partition, accumulate the contents of each partition into a subresult for that partition, and then use the combiner function to merge the subresults into a combined result.
根据 NoBlogDefFound 组合器仅在并行模式下使用。部分引述如下:
combiner() is used to join two accumulators together into one. It is used when collector is executed in parallel, splitting input Stream and collecting parts independently first.
为了更清楚地说明这个问题,我重写了第一段代码并采用了两种方法(串行和并行)。
public final class CollectorTest
{
private CollectorTest()
{
}
private static <T> BinaryOperator<T> nope()
{
return (t, u) -> { throw new UnsupportedOperationException("nope"); };
}
public static void main(final String... args)
{
final Collector<Integer, ?, List<Integer>> c =
Collector
.of(ArrayList::new, List::add, nope());
// approach sequential
Stream<Integer> sequential = IntStream
.range(0, 10_000_000)
.boxed();
System.out.println("isParallel:" + sequential.isParallel());
sequential
.collect(c);
// approach parallel
Stream<Integer> parallel = IntStream
.range(0, 10_000_000)
.parallel()
.boxed();
System.out.println("isParallel:" + parallel.isParallel());
parallel
.collect(c);
}
}
在运行这段代码之后我们可以得到输出:
isParallel:false
isParallel:true
Exception in thread "main" java.lang.UnsupportedOperationException: nope
at com.Whosebug.lambda.CollectorTest.lambda$nope[=11=](CollectorTest.java:18)
at com.Whosebug.lambda.CollectorTest$$Lambda/2001049719.apply(Unknown Source)
at java.util.stream.ReduceOpsReducingSink.combine(ReduceOps.java:174)
at java.util.stream.ReduceOpsReducingSink.combine(ReduceOps.java:160)
所以,根据这个结果我们可以推断Collector's combiner
只能被并行执行调用。
仔细阅读 ReduceOps.java 中的流实现代码会发现,组合函数仅在 ReduceTask
完成时调用,并且 ReduceTask
实例仅在评估管道时使用在平行下。因此,在当前的实现中, 在评估顺序管道时永远不会调用组合器。
但是,规范中没有任何内容可以保证这一点。 Collector
是一个对其实现有要求的接口,并且没有为顺序流授予豁免。就个人而言,我很难想象为什么顺序流水线评估可能需要调用组合器,但比我更有想象力的人可能会找到它的巧妙用途,并实现它。规范允许,即使今天的实现没有做到,你还是要考虑一下。
这应该不足为奇。流API的设计中心是支持与顺序执行平等的并行执行。当然,程序可以观察到它是顺序执行还是并行执行。但是 API 的设计是为了支持一种允许任一种编程风格。
如果您正在编写收集器并且发现不可能(或不方便或困难)编写关联组合器函数,导致您想要将流限制为顺序执行,也许这意味着您朝着错误的方向前进。是时候退后一步,考虑以不同的方式解决问题了。
不需要关联组合器函数的常见 reduction-style 操作称为 fold-left。主要特点是严格应用折叠功能 left-to-right,一次进行一个。我不知道并行化 fold-left.
的方法当人们试图以我们一直在谈论的方式扭曲收藏家时,他们通常会寻找类似 fold-left 的东西。 Streams API 没有对此操作的直接 API 支持,但它很容易编写。例如,假设您想使用以下操作减少字符串列表:重复第一个字符串,然后追加第二个。很容易证明这个操作不是关联的:
List<String> list = Arrays.asList("a", "b", "c", "d", "e");
System.out.println(list.stream()
.collect(StringBuilder::new,
(a, b) -> a.append(a.toString()).append(b),
(a, b) -> a.append(a.toString()).append(b))); // BROKEN -- NOT ASSOCIATIVE
运行 依次产生所需的输出:
aabaabcaabaabcdaabaabcaabaabcde
但是当运行并行时,它可能会产生这样的结果:
aabaabccdde
因为它是 "works" 顺序的,我们可以通过调用 sequential()
来强制执行它,并通过让组合器抛出异常来支持它。此外,供应商必须恰好被调用一次。没有办法合并中间结果,所以如果供应商被调用两次,我们就已经有麻烦了。但是由于我们 "know" 供应商在顺序模式下只被调用一次,所以大多数人并不担心这一点。事实上,我看到有人写 "suppliers" return 一些现有的 object 而不是创建一个新的,这违反了供应商合同。
在 collect()
的 3-arg 形式的使用中,三个函数中有两个违反了它们的约定。这不应该告诉我们以不同的方式做事吗?
这里的主要工作是由累加器函数完成的。要完成 fold-style 减少,我们可以使用 forEachOrdered()
以严格的 left-to-right 顺序应用此函数。我们必须在前后做一些设置和完成代码,但这没问题:
StringBuilder a = new StringBuilder();
list.parallelStream()
.forEachOrdered(b -> a.append(a.toString()).append(b));
System.out.println(a.toString());
自然地,这在并行时工作得很好,尽管 运行 并行的性能优势可能会被 forEachOrdered()
.
总而言之,如果您发现自己想要进行可变缩减,但缺少关联组合器功能,导致您将流限制为顺序执行,请将问题重铸为 fold-left 运算并在你的累加器函数上使用 forEachRemaining()
。