Java 流 - 有效地对已排序流上的项目进行分组

Java Streams - grouping items on sorted streams efficiently

我正在寻找一种实现非终端分组操作的方法,这样内存开销就会最小。

例如,考虑 distinct()。在一般情况下,它别无选择,只能收集所有不同的项目,然后才将它们向前传输。但是,如果我们知道输入流已经排序,则可以使用最少的内存完成操作 "on-the-fly"。

我知道我可以使用迭代器包装器并自己实现分组逻辑来为迭代器实现这一点。有没有更简单的方法来使用流 API 来实现这个?

--编辑--

我找到了一种滥用 Stream.flatMap(..) 的方法来实现这一点:

  private static class DedupSeq implements IntFunction<IntStream> {
    private Integer prev;

    @Override
    public IntStream apply(int value) {
      IntStream res = (prev != null && value == prev)? IntStream.empty() : IntStream.of(value);
      prev = value;
      return res;
    }    
  }

然后:

IntStream.of(1,1,3,3,3,4,4,5).flatMap(new DedupSeq()).forEach(System.out::println);

打印:

1
3
4
5

通过一些更改,相同的技术可用于任何类型的内存高效序列流分组。无论如何,我不太喜欢这个解决方案,我一直在寻找更自然的东西(例如映射或过滤的工作方式)。此外,我在这里打破了约定,因为提供给 flatMap(..) 的函数是有状态的。

正确地做到这一点的方法是将流变成一个拆分器,然后根据返回的拆分器的属性包装它

  • 如果源既未排序也未区分,则使用并发集执行简单的重复数据删除
  • 如果源拆分器已排序,则执行优化的优化去重复。
    支持 trySplit 操作将很棘手,因为它可能必须将子拆分器推进几步,直到它可以确定它是没有看到 运行 非不同元素的尾部。
  • 如果源已经不同,则 returns 按原样拆分器

一旦你有了那个拆分器,你就可以把它变回一个具有相同属性的流,并继续对它进行流操作

由于我们无法修改现有的 jdk 接口,因此助手 API 必须看起来更像这样:dedup(IntStream.of(...).map(...)).collect(...).


如果您检查 java.util.stream.DistinctOps.makeRef(AbstractPipeline<?, T, ?>) 的来源,您会注意到 JDK 或多或少会为基于引用的流执行此操作。

只是 IntStream 实现 (java.util.stream.IntPipeline.distinct()) 采用了一种低效的方法,没有利用 DISTINCTSORTED

它只是盲目地将 IntStream 转换为盒装 Integer 流并使用基于引用的重复数据删除,而不传递适当的标志以提高内存效率。

如果这在 jdk9 中还没有修复,那么它可能值得一个错误,因为如果流操作不必要地丢弃流标志,它本质上是不必要的内存消耗和优化潜力的浪费。

如果你想要一个不向不应该有可变状态的函数添加可变状态的解决方案,你可以诉诸 collect:

static void distinctForSorted(IntStream s, IntConsumer action) {
    s.collect(()->new long[]{Long.MIN_VALUE},
              (a, i)->{ if(a[0]!=i) { action.accept(i); assert i>a[0]; a[0]=i; }},
              (a, b)->{ throw new UnsupportedOperationException(); });
}

这是使用可变容器的预期方式,但它不能并行工作,因为在任意流位置拆分意味着有可能在两个(或更多)线程中遇到一个值。

如果您想要通用 IntStream 而不是 forEach 操作,则首选 Spliterator 低级解决方案,尽管会增加复杂性。

static IntStream distinctForSorted(IntStream s) {
    Spliterator.OfInt sp=s.spliterator();
    return StreamSupport.intStream(
      new Spliterators.AbstractIntSpliterator(sp.estimateSize(),
      Spliterator.DISTINCT|Spliterator.SORTED|Spliterator.NONNULL|Spliterator.ORDERED) {
        long last=Long.MIN_VALUE;
        @Override
        public boolean tryAdvance(IntConsumer action) {
            long prev=last;
            do if(!sp.tryAdvance(distinct(action))) return false; while(prev==last);
            return true;
        }
        @Override
        public void forEachRemaining(IntConsumer action) {
            sp.forEachRemaining(distinct(action));
        }
        @Override
        public Comparator<? super Integer> getComparator() {
            return null;
        }
        private IntConsumer distinct(IntConsumer c) {
            return i-> {
                if(i==last) return;
                assert i>last;
                last=i;
                c.accept(i);
            };
        }
    }, false);
}

它甚至继承了并行支持,尽管它通过在另一个线程中处理它们之前预取一些值来工作,因此它不会加速 distinct 操作,但可能会进行后续操作, 如果有计算密集型的话。


为了完成,这里有一个独特的任意操作,即未排序的 IntStreams,它不依赖于“装箱加 HashMap”,因此可能有更好的内存占用:

static IntStream distinct(IntStream s) {
    boolean parallel=s.isParallel();
    s=s.collect(BitSet::new, BitSet::set, BitSet::or).stream();
    if(parallel) s=s.parallel();
    return s;
}

它仅适用于正 int 值;将其扩展到完整的 32 位范围将需要两个 BitSet,因此看起来不那么简洁,但通常用例允许将存储限制在 31 位范围内甚至更低……