从一个长流创建流

Create stream of streams from one long stream

我想根据 Streams 的内容将单个 Stream 拆分为 StreamStreams。结果 Stream 应该包含部分原始流数据。

我的实际应用比较复杂(它是将时间间隔列表内的日志行分组),但我的问题是如何处理流,所以这里我问一个简化的例子。

例题

我希望能够根据重复的相同数字将 Stream<Integer> 拆分为 Stream<Stream<Integer>>,只留下奇数流。

例如,以下流包含:

{1,1,1,2,2,2,3,6,7,7,1,1}

需要生成包含以下内容的流:

{{1,1,1},{3},{7,7},{1,1}}

省略偶数我可以用过滤器开始(或结束):

Stream<Integer> input = ...;
Straem<Stream<Integer>> output = input.filter(this::isOdd).someOtherOperation();

这是不受欢迎的,因为这意味着对每个输入值进行两次评估,这是可以接受的,但我希望避免这种情况。

解决方案的想法

我当前的解决方案是迭代流的内容并创建 List<List<Integer>> 并将其转换为 Stream<Stream<Integer>>。然而,这意味着完整的结果保存在内存中(这对我的应用程序来说是不希望的)。

我也认为我可以通过编写自己的 Iterator 从流中读取来实现这一点,但我不确定这将如何工作。

问题

如何根据原始 Stream 的内容将 Stream 转换为 StreamsStream,而不将完整结果存储为 ListLists 首先。

恐怕这是行不通的,至少不是以一种好的方式。即使您将元素映射到流中并减少它们,这些内部流也必须知道它们包含哪些元素,因此它们必须存储一些东西。

最简单的解决方案是只使用 groupingBy,但它会将所有结果存储在地图中:

List<Integer> input = asList(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1);
Map<Integer, List<Integer>> grouped = input.stream().collect(groupingBy(i -> i));
Stream<Stream<Integer>> streamOfStreams = grouped.values().stream().map(list -> list.stream());

您可以尝试使用 reduce 操作,但它需要您实现自己的 Stream of Streams,您必须在其中存储每个流包含的元素。更何况要实现起来还要费一番功夫

对于你的情况,我能想到的最佳解决方案是遍历列表两次:

public static void main(String[] args) {
    List<Integer> input = asList(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1);

    input.stream().distinct().filter(i -> isOdd(i)).forEach(i -> {
        List<Integer> subList = input.stream().filter(j -> Objects.equals(j, i)).collect(toList());
        System.out.println(subList); // do something with the stream instead of collecting to list
    });
}

private static boolean isOdd(Integer i) {
    return (i & 1) == 1;
}

但是请注意,它具有 O(n^2) 时间复杂度。

编辑:

此解决方案将只有局部元素组。它只存储当前本地组。

public static void main(String[] args) {
    Stream<Integer> input = Stream.of(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1);

    Iterator<Integer> iterator = input.iterator();
    int first;
    int second = iterator.next();

    List<Integer> buffer = new ArrayList<>();
    buffer.add(second);

    do {
        first = second;
        second = iterator.next();

        if (Objects.equals(first, second)) {
            buffer.add(second);
        } else {
            doSomethingWithTheGroup(buffer);
            buffer = new ArrayList<>(); // let GC remove the previous buffer
            buffer.add(second);
        }
    } while (iterator.hasNext());
    doSomethingWithTheGroup(buffer);
}

private static void doSomethingWithTheGroup(List<Integer> buffer) {
    System.out.println(buffer);
}

private static boolean isOdd(Integer i) {
    return (i & 1) == 1;
}

输出:

[1, 1, 1]
[2, 2, 2]
[3]
[6]
[7, 7]
[1, 1]

像@Jaroslaw 一样,我也使用 Map 来保存不同的流。但是, 地图将保存根据输入构建的流而不是预先收集的流是可行的。使用 Stream.concatStream.of 您可以将一个元素添加到流中:

    Map<Integer, Stream<Integer>> streamMap = new HashMap<>();

    int[] arr = {1,1,1,2,2,2,3,6,7,7,1,1};
    Arrays.stream(arr)
    .filter(this::isOdd)
    .forEach(i -> {
        Stream<Integer> st = streamMap.get(i);
        if (st == null)  st = Stream.of(i);
        else st = Stream.concat(st, Stream.of(i));
        streamMap.put(i, st);
    });

    streamMap.entrySet().stream().forEach(e -> {
        System.out.print(e.getKey() + "={");
        e.getValue().forEach(System.out::print);
        System.out.println("}");
    });

输出:

1={11111}
3={3}
7={77}

您可能想要实现自己的 aggregating spliterator to do this. There's already something similar in the proton-pack 库(第一个 link 重定向到在 proton-pack 中实现的库)。

注意你得到一个Stream<List<Integer>>(你可以尝试修改实现直接有一个Stream<Stream<Integer>>,但你总是需要缓冲少量元素;取决于window 的大小;测试是否应该创建一个新的 window)。例如:

StreamUtils.aggregate(Stream.of(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1), 
                      Objects::equals)
           .forEach(System.out::println);

输出:

[1, 1, 1]
[2, 2, 2]
[3]
[6]
[7, 7]
[1, 1]

您可以使用我的 StreamEx library. It has groupRuns 来完成这项工作:

List<Integer> input = Arrays.asList(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1);
Stream<Stream<Integer>> streams = StreamEx.of(input).filter(this::isOdd)
    .groupRuns(Integer::equals)
    .map(List::stream);

用法示例:

streams.map(s -> StreamEx.of(s).joining(",")).forEach(System.out::println);

输出:

1,1,1
3
7,7
1,1

类似于 protonpack 库,里面有一个自定义拆分器,但是使用 StreamEx 可以利用并行处理(protonpack 根本不拆分)。

在顺序处理中,一次最多有一个中间列表驻留在内存中(其他有资格进行 GC)。如果您仍然担心内存消耗(例如,您有很长的组),自 StreamEx 0.3.3 以来,有另一种方法可以解决此任务:

Stream<Stream<Integer>> streams = StreamEx.of(input).filter(this::isOdd)
        .runLengths()
        .mapKeyValue(StreamEx::constant);

runLengths 方法 returns 条目流,其中键是元素,值是相邻重复元素的数量。之后使用 StreamEx.constant,这是 Stream.generate(() -> value).limit(length) 的快捷方式。因此,即使对于非常长的组,您也会有恒定的中间内存消耗。当然这个版本也是并行友好的。

更新: StreamEx 0.3.3 已发布,因此第二个解决方案现在也符合条件。