从一个长流创建流
Create stream of streams from one long stream
我想根据 Streams
的内容将单个 Stream
拆分为 Stream
个 Streams
。结果 Stream
应该包含部分原始流数据。
我的实际应用比较复杂(它是将时间间隔列表内的日志行分组),但我的问题是如何处理流,所以这里我问一个简化的例子。
例题
我希望能够根据重复的相同数字将 Stream<Integer>
拆分为 Stream<Stream<Integer>>
,只留下奇数流。
例如,以下流包含:
{1,1,1,2,2,2,3,6,7,7,1,1}
需要生成包含以下内容的流:
{{1,1,1},{3},{7,7},{1,1}}
省略偶数我可以用过滤器开始(或结束):
Stream<Integer> input = ...;
Straem<Stream<Integer>> output = input.filter(this::isOdd).someOtherOperation();
这是不受欢迎的,因为这意味着对每个输入值进行两次评估,这是可以接受的,但我希望避免这种情况。
解决方案的想法
我当前的解决方案是迭代流的内容并创建 List<List<Integer>>
并将其转换为 Stream<Stream<Integer>>
。然而,这意味着完整的结果保存在内存中(这对我的应用程序来说是不希望的)。
我也认为我可以通过编写自己的 Iterator
从流中读取来实现这一点,但我不确定这将如何工作。
问题
如何根据原始 Stream
的内容将 Stream
转换为 Streams
的 Stream
,而不将完整结果存储为 List
的 Lists
首先。
恐怕这是行不通的,至少不是以一种好的方式。即使您将元素映射到流中并减少它们,这些内部流也必须知道它们包含哪些元素,因此它们必须存储一些东西。
最简单的解决方案是只使用 groupingBy
,但它会将所有结果存储在地图中:
List<Integer> input = asList(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1);
Map<Integer, List<Integer>> grouped = input.stream().collect(groupingBy(i -> i));
Stream<Stream<Integer>> streamOfStreams = grouped.values().stream().map(list -> list.stream());
您可以尝试使用 reduce
操作,但它需要您实现自己的 Stream of Streams,您必须在其中存储每个流包含的元素。更何况要实现起来还要费一番功夫
对于你的情况,我能想到的最佳解决方案是遍历列表两次:
public static void main(String[] args) {
List<Integer> input = asList(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1);
input.stream().distinct().filter(i -> isOdd(i)).forEach(i -> {
List<Integer> subList = input.stream().filter(j -> Objects.equals(j, i)).collect(toList());
System.out.println(subList); // do something with the stream instead of collecting to list
});
}
private static boolean isOdd(Integer i) {
return (i & 1) == 1;
}
但是请注意,它具有 O(n^2)
时间复杂度。
编辑:
此解决方案将只有局部元素组。它只存储当前本地组。
public static void main(String[] args) {
Stream<Integer> input = Stream.of(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1);
Iterator<Integer> iterator = input.iterator();
int first;
int second = iterator.next();
List<Integer> buffer = new ArrayList<>();
buffer.add(second);
do {
first = second;
second = iterator.next();
if (Objects.equals(first, second)) {
buffer.add(second);
} else {
doSomethingWithTheGroup(buffer);
buffer = new ArrayList<>(); // let GC remove the previous buffer
buffer.add(second);
}
} while (iterator.hasNext());
doSomethingWithTheGroup(buffer);
}
private static void doSomethingWithTheGroup(List<Integer> buffer) {
System.out.println(buffer);
}
private static boolean isOdd(Integer i) {
return (i & 1) == 1;
}
输出:
[1, 1, 1]
[2, 2, 2]
[3]
[6]
[7, 7]
[1, 1]
像@Jaroslaw 一样,我也使用 Map 来保存不同的流。但是, 地图将保存根据输入构建的流而不是预先收集的流是可行的。使用 Stream.concat
和 Stream.of
您可以将一个元素添加到流中:
Map<Integer, Stream<Integer>> streamMap = new HashMap<>();
int[] arr = {1,1,1,2,2,2,3,6,7,7,1,1};
Arrays.stream(arr)
.filter(this::isOdd)
.forEach(i -> {
Stream<Integer> st = streamMap.get(i);
if (st == null) st = Stream.of(i);
else st = Stream.concat(st, Stream.of(i));
streamMap.put(i, st);
});
streamMap.entrySet().stream().forEach(e -> {
System.out.print(e.getKey() + "={");
e.getValue().forEach(System.out::print);
System.out.println("}");
});
输出:
1={11111}
3={3}
7={77}
您可能想要实现自己的 aggregating spliterator to do this. There's already something similar in the proton-pack 库(第一个 link 重定向到在 proton-pack 中实现的库)。
注意你得到一个Stream<List<Integer>>
(你可以尝试修改实现直接有一个Stream<Stream<Integer>>
,但你总是需要缓冲少量元素;取决于window 的大小;测试是否应该创建一个新的 window)。例如:
StreamUtils.aggregate(Stream.of(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1),
Objects::equals)
.forEach(System.out::println);
输出:
[1, 1, 1]
[2, 2, 2]
[3]
[6]
[7, 7]
[1, 1]
您可以使用我的 StreamEx
library. It has groupRuns
来完成这项工作:
List<Integer> input = Arrays.asList(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1);
Stream<Stream<Integer>> streams = StreamEx.of(input).filter(this::isOdd)
.groupRuns(Integer::equals)
.map(List::stream);
用法示例:
streams.map(s -> StreamEx.of(s).joining(",")).forEach(System.out::println);
输出:
1,1,1
3
7,7
1,1
类似于 protonpack 库,里面有一个自定义拆分器,但是使用 StreamEx 可以利用并行处理(protonpack 根本不拆分)。
在顺序处理中,一次最多有一个中间列表驻留在内存中(其他有资格进行 GC)。如果您仍然担心内存消耗(例如,您有很长的组),自 StreamEx 0.3.3 以来,有另一种方法可以解决此任务:
Stream<Stream<Integer>> streams = StreamEx.of(input).filter(this::isOdd)
.runLengths()
.mapKeyValue(StreamEx::constant);
runLengths
方法 returns 条目流,其中键是元素,值是相邻重复元素的数量。之后使用 StreamEx.constant
,这是 Stream.generate(() -> value).limit(length)
的快捷方式。因此,即使对于非常长的组,您也会有恒定的中间内存消耗。当然这个版本也是并行友好的。
更新: StreamEx 0.3.3 已发布,因此第二个解决方案现在也符合条件。
我想根据 Streams
的内容将单个 Stream
拆分为 Stream
个 Streams
。结果 Stream
应该包含部分原始流数据。
我的实际应用比较复杂(它是将时间间隔列表内的日志行分组),但我的问题是如何处理流,所以这里我问一个简化的例子。
例题
我希望能够根据重复的相同数字将 Stream<Integer>
拆分为 Stream<Stream<Integer>>
,只留下奇数流。
例如,以下流包含:
{1,1,1,2,2,2,3,6,7,7,1,1}
需要生成包含以下内容的流:
{{1,1,1},{3},{7,7},{1,1}}
省略偶数我可以用过滤器开始(或结束):
Stream<Integer> input = ...;
Straem<Stream<Integer>> output = input.filter(this::isOdd).someOtherOperation();
这是不受欢迎的,因为这意味着对每个输入值进行两次评估,这是可以接受的,但我希望避免这种情况。
解决方案的想法
我当前的解决方案是迭代流的内容并创建 List<List<Integer>>
并将其转换为 Stream<Stream<Integer>>
。然而,这意味着完整的结果保存在内存中(这对我的应用程序来说是不希望的)。
我也认为我可以通过编写自己的 Iterator
从流中读取来实现这一点,但我不确定这将如何工作。
问题
如何根据原始 Stream
的内容将 Stream
转换为 Streams
的 Stream
,而不将完整结果存储为 List
的 Lists
首先。
恐怕这是行不通的,至少不是以一种好的方式。即使您将元素映射到流中并减少它们,这些内部流也必须知道它们包含哪些元素,因此它们必须存储一些东西。
最简单的解决方案是只使用 groupingBy
,但它会将所有结果存储在地图中:
List<Integer> input = asList(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1);
Map<Integer, List<Integer>> grouped = input.stream().collect(groupingBy(i -> i));
Stream<Stream<Integer>> streamOfStreams = grouped.values().stream().map(list -> list.stream());
您可以尝试使用 reduce
操作,但它需要您实现自己的 Stream of Streams,您必须在其中存储每个流包含的元素。更何况要实现起来还要费一番功夫
对于你的情况,我能想到的最佳解决方案是遍历列表两次:
public static void main(String[] args) {
List<Integer> input = asList(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1);
input.stream().distinct().filter(i -> isOdd(i)).forEach(i -> {
List<Integer> subList = input.stream().filter(j -> Objects.equals(j, i)).collect(toList());
System.out.println(subList); // do something with the stream instead of collecting to list
});
}
private static boolean isOdd(Integer i) {
return (i & 1) == 1;
}
但是请注意,它具有 O(n^2)
时间复杂度。
编辑:
此解决方案将只有局部元素组。它只存储当前本地组。
public static void main(String[] args) {
Stream<Integer> input = Stream.of(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1);
Iterator<Integer> iterator = input.iterator();
int first;
int second = iterator.next();
List<Integer> buffer = new ArrayList<>();
buffer.add(second);
do {
first = second;
second = iterator.next();
if (Objects.equals(first, second)) {
buffer.add(second);
} else {
doSomethingWithTheGroup(buffer);
buffer = new ArrayList<>(); // let GC remove the previous buffer
buffer.add(second);
}
} while (iterator.hasNext());
doSomethingWithTheGroup(buffer);
}
private static void doSomethingWithTheGroup(List<Integer> buffer) {
System.out.println(buffer);
}
private static boolean isOdd(Integer i) {
return (i & 1) == 1;
}
输出:
[1, 1, 1]
[2, 2, 2]
[3]
[6]
[7, 7]
[1, 1]
像@Jaroslaw 一样,我也使用 Map 来保存不同的流。但是, 地图将保存根据输入构建的流而不是预先收集的流是可行的。使用 Stream.concat
和 Stream.of
您可以将一个元素添加到流中:
Map<Integer, Stream<Integer>> streamMap = new HashMap<>();
int[] arr = {1,1,1,2,2,2,3,6,7,7,1,1};
Arrays.stream(arr)
.filter(this::isOdd)
.forEach(i -> {
Stream<Integer> st = streamMap.get(i);
if (st == null) st = Stream.of(i);
else st = Stream.concat(st, Stream.of(i));
streamMap.put(i, st);
});
streamMap.entrySet().stream().forEach(e -> {
System.out.print(e.getKey() + "={");
e.getValue().forEach(System.out::print);
System.out.println("}");
});
输出:
1={11111}
3={3}
7={77}
您可能想要实现自己的 aggregating spliterator to do this. There's already something similar in the proton-pack 库(第一个 link 重定向到在 proton-pack 中实现的库)。
注意你得到一个Stream<List<Integer>>
(你可以尝试修改实现直接有一个Stream<Stream<Integer>>
,但你总是需要缓冲少量元素;取决于window 的大小;测试是否应该创建一个新的 window)。例如:
StreamUtils.aggregate(Stream.of(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1),
Objects::equals)
.forEach(System.out::println);
输出:
[1, 1, 1]
[2, 2, 2]
[3]
[6]
[7, 7]
[1, 1]
您可以使用我的 StreamEx
library. It has groupRuns
来完成这项工作:
List<Integer> input = Arrays.asList(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1);
Stream<Stream<Integer>> streams = StreamEx.of(input).filter(this::isOdd)
.groupRuns(Integer::equals)
.map(List::stream);
用法示例:
streams.map(s -> StreamEx.of(s).joining(",")).forEach(System.out::println);
输出:
1,1,1
3
7,7
1,1
类似于 protonpack 库,里面有一个自定义拆分器,但是使用 StreamEx 可以利用并行处理(protonpack 根本不拆分)。
在顺序处理中,一次最多有一个中间列表驻留在内存中(其他有资格进行 GC)。如果您仍然担心内存消耗(例如,您有很长的组),自 StreamEx 0.3.3 以来,有另一种方法可以解决此任务:
Stream<Stream<Integer>> streams = StreamEx.of(input).filter(this::isOdd)
.runLengths()
.mapKeyValue(StreamEx::constant);
runLengths
方法 returns 条目流,其中键是元素,值是相邻重复元素的数量。之后使用 StreamEx.constant
,这是 Stream.generate(() -> value).limit(length)
的快捷方式。因此,即使对于非常长的组,您也会有恒定的中间内存消耗。当然这个版本也是并行友好的。
更新: StreamEx 0.3.3 已发布,因此第二个解决方案现在也符合条件。