如何根据顺序将流的元素收集到固定大小的组中

How to collect element of the stream into groups of fixed size in accordance with their order

我正在尝试使用 Java 8 对 Stream<BigDecimal> 执行以下操作,但卡在 步骤 2

  1. 删除 null 负值
  2. 创建大小为 3 个元素的组。保留平均小于30的组,否则丢弃。

例子。让我们假设以下情况:

stream<Bigdecimal> input = {4,5,61,3,9,3,1,null,-4,7,2,-8,6,-3,null}; //technically its incorrect but just assume.

我能够解决 步骤 1,如下所示:

Stream<BigDecimal> newInList = input.filter(bd -> (bd != null && bd.signum() > 0));

我无法执行 步骤 2 - 创建 3 个元素的组。

第 2 步 的预期结果:{4,5,6},{61,3,9},{3,1,7}

我正在寻找具有 Java 8 个流的解决方案。

因此您需要根据顺序从流中提取 大小为 3 个元素 的组。

可以使用 Stream API 通过实现实现 Collector 接口的 自定义收集器 来完成.

在初始化 GroupCollector 时必须提供组的大小(这样做是为了使收集器更灵活并避免 hard-coding 3 的值在 class).

Deque<List<T>> 用作可变容器,因为 Deque 接口提供了对最后一个元素的方便访问。

combiner()方法提供了如何组合不同线程获得的执行结果的逻辑。 并行流collect() 操作提供了 保证 ,即流的初始顺序将被保留,并且来自不同线程的结果将按照分配给他们的任务的顺序加入。因此这个解决方案可以并行化。

组合由不同线程产生的两个队列的逻辑涉及以下问题:

  • 确保所有组(应该是最后一个组除外)都恰好有 3 个元素。因此我们不能简单地将second deque的所有内容添加到first deque。相反,第二个双端队列的每一组都必须一个一个地处理。
  • 已创建的列表应重复使用

finisher() 函数将丢弃 deque 中的最后一个 list 如果它的大小小于 groupSize(PO 在评论中提供的要求)。

例如,我使用了问题中的数字序列。

    public static void main(String[] args) {
        Stream<BigDecimal> source =
                IntStream.of(4, 5, 6, 61, 3, 9, 3, 1, 7, 2, 6)
                .mapToObj(BigDecimal::valueOf);

        System.out.println(createGroups(source)
                .flatMap(List::stream)
                .collect(Collectors.toList())); // collecting to list for demonstration purposes
    }

方法createGroups()

    public static Stream<List<BigDecimal>> createGroups(Stream<BigDecimal> source) {
        return source
                .collect(new GroupCollector<BigDecimal>(3))
                .stream()
                .filter(list -> averageIsLessThen(list, 30));
    }

收藏家

    public class GroupCollector<T> implements Collector<T, Deque<List<T>>, Deque<List<T>>> {
        private final int groupSize;

        public GroupCollector(int groupSize) {
            this.groupSize = groupSize;
        }

        @Override
        public Supplier<Deque<List<T>>> supplier() {
            return ArrayDeque::new;
        }

        @Override
        public BiConsumer<Deque<List<T>>, T> accumulator() {
            return (deque, next) -> {
                if (deque.isEmpty() || deque.getLast().size() == groupSize) {
                    List<T> group = new ArrayList<>();
                    group.add(next);
                    deque.addLast(group);
                } else {
                    deque.getLast().add(next);
                }
            };
        }

        @Override
        public BinaryOperator<Deque<List<T>>> combiner() {
            return (deque1, deque2) -> {
                if (deque1.isEmpty()) {
                    return deque2;
                } else if (deque1.getLast().size() == groupSize) {
                    deque1.addAll(deque2);
                    return deque1;
                }
                // last group in the deque1 has a size less than groupSize
                List<T> curGroup = deque1.pollLast();
                List<T> nextGroup;
                for (List<T> nextItem: deque2) {
                    nextGroup = nextItem;
                    Iterator<T> iter = nextItem.iterator();
                    while (iter.hasNext() && curGroup.size() < groupSize) {
                        curGroup.add(iter.next());
                        iter.remove();
                    }
                    deque1.add(curGroup);
                    curGroup = nextGroup;
                }
                if (curGroup.size() != 0) {
                    deque1.add(curGroup);
                }
                return deque1;
            };
        }

        @Override
        public Function<Deque<List<T>>, Deque<List<T>>> finisher() {
            return deque -> {
                if (deque.peekLast() != null && deque.peekLast().size() < groupSize) {
                    deque.pollLast();
                }
                return deque;
            };
        }

        @Override
        public Set<Characteristics> characteristics() {
            return Collections.emptySet();
        }
    }

用于根据其平均值[=76]验证元素的辅助方法 =](如果您想知道 RoundingMode 是什么意思,请阅读 this answer)。

    private static boolean averageIsLessThen(List<BigDecimal> list, double target) {
        BigDecimal average = list.stream()
                .reduce(BigDecimal.ZERO, BigDecimal::add)
                .divide(BigDecimal.valueOf(list.size()), RoundingMode.HALF_UP);

        return average.compareTo(BigDecimal.valueOf(target)) < 0;
    }

输出预期结果: { 4, 5, 6, 61, 3, 9, 3, 1, 7 }, 由 P​​O 提供)

[4, 5, 6, 61, 3, 9, 3, 1, 7]