Java 收集器准备输入列表

Java collector teeing a list of inputs

我正在尝试实现一个简单的收集器,它采用收集器列表并同时以与流略有不同的方式收集值。

Collectors.teeing非常相似,但不同之处在于

  1. 收到一个收集器列表而不是两个
  2. 要求所有收集器产生相同类型的值

我想要的类型签名是

public static <T, R> Collector<T, ?, List<R>> list(
      final List<Collector<T, ?, R>> downstreamCollectors);

创建此类收集器的一种方法是递归配对发球收集器,如下所示:

public static <T, R> Collector<T, ?, List<R>> list(
    final List<Collector<T, ?, R>> downstreamCollectors) {
  return listrec(
      Collectors.collectingAndThen(downstreamCollectors.get(0), List::of),
      downstreamCollectors.stream().skip(1).toList());
}

private static <T, R> Collector<T, ?, List<R>> listrec(
    final Collector<T, ?, List<R>> teedCollectors,
    final List<Collector<T, ?, R>> downstreamCollectors) {
  if (downstreamCollectors.size() == 0) {
    return teedCollectors;
  } else {
    return listrec(
        teeing(
            teedCollectors,
            downstreamCollectors.get(0),
            (l, s) -> Stream.concat(l.stream(), Stream.of(s)).toList()),
        downstreamCollectors.stream().skip(1).toList());
  }
}

这个解决方案感觉有点“不对劲”,所以我尝试自己创建收集器,例如:

public static <T, R> Collector<T, ?, List<R>> list2(
    final List<Collector<T, ?, R>> downstreamCollectors) {
  return Collector.of(
      () -> downstreamCollectors.stream().map(c -> c.supplier().get()).toList(),
      (accumulators, t) ->
          IntStream.range(0, downstreamCollectors.size())
              .forEach(
                  i -> downstreamCollectors.get(i).accumulator().accept(accumulators.get(i), t)),
      (accumulator1, accumulator2) ->
          IntStream.range(0, downstreamCollectors.size())
              .mapToObj(
                  i ->
                      downstreamCollectors
                          .get(i)
                          .combiner()
                          .apply(accumulator1.get(i), accumulator2.get(i)))
              .toList(),
      accumulators ->
          IntStream.range(0, downstreamCollectors.size())
              .mapToObj(i -> downstreamCollectors.get(i).finisher().apply(accumulators.get(i)))
              .toList());
}

由于下游收集器的累加器类型中的无限通配符,这无法编译。将类型签名更改为

public static <T, A, R> Collector<? super T, ?, List<R>> list2(
    final List<Collector<? super T, A, R>> downstreamCollectors);

解决了这个问题,但不幸的是,由于下游收集器(如 java.util.stream.Collectors 中的内置收集器)通常会在累加器类型中使用无限通配符,因此该方法的可用性大大降低。

是否有另一种方法来实现这个,在方法签名中保留通配符?

我正在使用 OpenJDK 17.0.2。

无法以类型安全的方式处理具有任意累加器类型的收集器列表作为平面列表,因为它需要声明 n 类型变量来捕获这些类型,其中 n 是实际列表大小。

因此,您只能将处理实现为操作的组合,每个操作都具有在编译时已知的有限数量的组件,就像您的递归方法一样。

这仍有可能进行简化,例如将 downstreamCollectors.size() == 0 替换为 downstreamCollectors.isEmpty() 或将 downstreamCollectors.stream().skip(1).toList() 替换为免费复制 downstreamCollectors.subList(1, downstreamCollectors.size())

但最大的影响是用 Stream Reduction 操作替换递归代码:

public static <T, R> Collector<T, ?, List<R>> list(List<Collector<T, ?, R>> collectors) {
    return collectors.stream()
            .<Collector<T, ?, List<R>>>map(c-> Collectors.collectingAndThen(c, List::of))
            .reduce((c1, c2) -> teeing(c1, c2,
                        (l1, l2) -> Stream.concat(l1.stream(), l2.stream()).toList()))
            .orElseThrow(() -> new IllegalArgumentException("no collector specified"));
}

如果您没有大量的收集器需要编写,这可能会工作得很好。这种简洁的解决方案的一个缺点是,在实际合并结果之前,每个结果都会被包装到一个元素列表中,甚至结果合并可能会承担多个列表复制操作。

可以使用

优化此结果处理
public static <T, R> Collector<T, ?, List<R>> list(List<Collector<T, ?, R>> collectors) {
    int num = collectors.size();
    switch(num) {
        case 0: throw new IllegalArgumentException("no collector specified");
        case 1: return collectingAndThen(collectors.get(0), List::of);
        case 2: return teeing(collectors.get(0), collectors.get(1), List::of);
        case 3: return teeing(teeing(collectors.get(0), collectors.get(1), List::of),
                           collectors.get(2), (l,r) -> List.of(l.get(0), l.get(1), r));
        default:
    }
    Collector<T,?,List<R>> c = teeing(collectors.get(0), collectors.get(1), (r1, r2) -> {
        var list = new ArrayList<R>(num);
        list.add(r1);
        list.add(r2);
        return list;
    });
    for(int ix = 2; ix < num; ix ++) {
        c = teeing(c, collectors.get(ix), (list, r) -> { list.add(r); return list; });
    }
    return collectingAndThen(c, List::copyOf);
}

这为少数收集器提供了特殊情况,其结果可用于直接构建不可变结果列表。对于其他情况,在将列表转换为最终不可变列表之前,所有结果都首先添加到 ArrayList 中,以防止过多的列表复制。最后一步可以省略,如果获得不可变的结果列表并不重要,我只是尝试尽可能接近原始方法的 Stream.toList() 行为。

在 Stream 处理过程中,幕后仍然存在不平衡的递归结构,这禁止了真正大量的收集器。有两种方法可以解决这个问题。

  1. 实现您自己的类型安全的发球变体,它公开中间容器类型,以允许构建平衡树并通过遍历此树将所有结果收集到列表中而无需额外的中间存储。

  2. 放弃类型安全并使用平面列表和原始类型实现收集器。尽量限制不安全代码。

但是当您估计了要“开球”的收集器的预期数量并发现第一个解决方案足够好时,可能不需要这样做。