Java 按谓词将流拆分为流的流
Java split stream by predicate into stream of streams
我有数百个大型 (6GB) gzip 日志文件,我正在使用 GZIPInputStream
阅读这些文件并希望对其进行解析。假设每个都有格式:
Start of log entry 1
...some log details
...some log details
...some log details
Start of log entry 2
...some log details
...some log details
...some log details
Start of log entry 3
...some log details
...some log details
...some log details
我正在通过 BufferedReader.lines()
逐行传输 gzip 文件内容。流看起来像:
[
"Start of log entry 1",
" ...some log details",
" ...some log details",
" ...some log details",
"Start of log entry 2",
" ...some log details",
" ...some log details",
" ...some log details",
"Start of log entry 2",
" ...some log details",
" ...some log details",
" ...some log details",
]
每个日志条目的开始都可以由谓词识别:line -> line.startsWith("Start of log entry")
。我想根据这个谓词将这个 Stream<String>
转换成 Stream<Stream<String>>
。每个 "substream" 应该在谓词为真时开始,并在谓词为假时收集行,直到下一次谓词为真,这表示该子流的结束和下一个子流的开始。结果如下:
[
[
"Start of log entry 1",
" ...some log details",
" ...some log details",
" ...some log details",
],
[
"Start of log entry 2",
" ...some log details",
" ...some log details",
" ...some log details",
],
[
"Start of log entry 3",
" ...some log details",
" ...some log details",
" ...some log details",
],
]
从那里,我可以获取每个子流并将其映射到 new LogEntry(Stream<String> logLines)
,以便将相关的日志行聚合到 LogEntry
个对象中。
这里有一个粗略的想法:
import java.io.*;
import java.nio.charset.*;
import java.util.*;
import java.util.function.*;
import java.util.stream.*;
import static java.lang.System.out;
class Untitled {
static final String input =
"Start of log entry 1\n" +
" ...some log details\n" +
" ...some log details\n" +
" ...some log details\n" +
"Start of log entry 2\n" +
" ...some log details\n" +
" ...some log details\n" +
" ...some log details\n" +
"Start of log entry 3\n" +
" ...some log details\n" +
" ...some log details\n" +
" ...some log details";
static final Predicate<String> isLogEntryStart = line -> line.startsWith("Start of log entry");
public static void main(String[] args) throws Exception {
try (ByteArrayInputStream gzipInputStream
= new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)); // mock for fileInputStream based gzipInputStream
InputStreamReader inputStreamReader = new InputStreamReader( gzipInputStream );
BufferedReader reader = new BufferedReader( inputStreamReader )) {
reader.lines()
.splitByPredicate(isLogEntryStart) // <--- What witchcraft should go here?
.map(LogEntry::new)
.forEach(out::println);
}
}
}
约束:我有数百个这样的大文件要并行处理(但每个文件只有一个顺序流),这使得将它们完全加载到内存中(例如通过将它们存储为 List<String> lines
) 不可行。
感谢任何帮助!
我认为主要问题是您正在逐行阅读并尝试在行之外创建一个 LogEntry
实例,而不是逐块阅读(这可能会覆盖很多行)。
为此,您可以使用 Scanner.findAll
(自 Java 9 起可用)和适当的正则表达式:
String input =
"Start of log entry 1\n" +
" ...some log details 1.1\n" +
" ...some log details 1.2\n" +
" ...some log details 1.3\n" +
"Start of log entry 2\n" +
" ...some log details 2.1\n" +
" ...some log details 2.2\n" +
" ...some log details 2.3\n" +
"Start of log entry 3\n" +
" ...some log details 3.1\n" +
" ...some log details 3.2\n" +
" ...some log details 3.3";
try (ByteArrayInputStream gzip =
new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8));
InputStreamReader reader = new InputStreamReader(gzip);
Scanner scanner = new Scanner(reader)) {
String START = "Start of log entry \d+";
Pattern pattern = Pattern.compile(
START + "(?<=" + START + ").*?(?=" + START + "|$)",
Pattern.DOTALL);
scanner.findAll(pattern)
.map(MatchResult::group)
.map(s -> s.split("\R"))
.map(LogEntry::new)
.forEach(System.out::println);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
因此,这是通过在 Scanner
实例中延迟查找匹配来实现的。 Scanner.findAll
returns 一个 Stream<MatchResult>
和 MatchResult.group()
returns 匹配的 String
。然后我们将这个字符串拆分为 line-breaks (\R
)。这 returns 一个 String[]
数组的每个元素都是每一行。然后,假设 LogEntry
有一个接受 String[]
参数的构造函数,我们将这些数组中的每一个都转换为一个 LogEntry
实例。最后,假设 LogEntry
有一个覆盖的 toString()
方法,我们将每个 LogEntry
实例打印到输出。
值得一提的是,Scanner
在流上调用 forEach
时开始工作。
另外一个注释是我们用来匹配输入中的日志条目的正则表达式。我不是正则表达式领域的专家,所以我几乎可以肯定这里还有很大的改进空间。首先,我们使用 Pattern.DOTALL
这样 .
不仅可以匹配普通字符,还可以匹配换行符。然后是实际的正则表达式。这个想法是它匹配并使用 Start of log entry \d+
,然后它使用 look-behind 来对抗 Start of log entry \d+
,然后它使用 Start of log entry \d+
中输入的字符=53=]non-greedy 方式(这是 .*?
部分)最后它 looks-ahead 检查是否有另一个出现 Start of log entry \d+
或是否已到达输入末尾。如果您想深入了解这个主题,请参阅此 amazing article about regular expressions。
如果您未使用 Java 9+,我不知道有任何类似的选择。不过,您可以做的是创建一个自定义 Spliterator
,它包装由 BufferedReader.lines()
返回的流返回的 Spliterator
,并向其添加所需的解析行为。然后,您需要从这个 Spliterator
中创建一个新的 Stream
。一点都不简单...
Frederico 的回答可能是解决这个特定问题的最佳方法。根据他对自定义 Spliterator
的最后想法,我将添加对 a similar question 的答案的改编版本,我在其中建议使用自定义迭代器来创建分块流。这种方法也适用于不是由输入读取器创建的其他流。
public class StreamSplitter<T>
implements Iterator<Stream<T>>
{
private Iterator<T> incoming;
private Predicate<T> startOfNewEntry;
private T nextLine;
public static <T> Stream<Stream<T>> streamOf(Stream<T> incoming, Predicate<T> startOfNewEntry)
{
Iterable<Stream<T>> iterable = () -> new StreamSplitter<>(incoming, startOfNewEntry);
return StreamSupport.stream(iterable.spliterator(), false);
}
private StreamSplitter(Stream<T> stream, Predicate<T> startOfNewEntry)
{
this.incoming = stream.iterator();
this.startOfNewEntry = startOfNewEntry;
if (incoming.hasNext())
nextLine = incoming.next();
}
@Override
public boolean hasNext()
{
return nextLine != null;
}
@Override
public Stream<T> next()
{
List<T> nextEntrysLines = new ArrayList<>();
do
{
nextEntrysLines.add(nextLine);
} while (incoming.hasNext()
&& !startOfNewEntry.test((nextLine = incoming.next())));
if (!startOfNewEntry.test(nextLine)) // incoming does not have next
nextLine = null;
return nextEntrysLines.stream();
}
}
例子
public static void main(String[] args)
{
Stream<String> flat = Stream.of("Start of log entry 1",
" ...some log details",
" ...some log details",
"Start of log entry 2",
" ...some log details",
" ...some log details",
"Start of log entry 3",
" ...some log details",
" ...some log details");
StreamSplitter.streamOf(flat, line -> line.matches("Start of log entry.*"))
.forEach(logEntry -> {
System.out.println("------------------");
logEntry.forEach(System.out::println);
});
}
// Output
// ------------------
// Start of log entry 1
// ...some log details
// ...some log details
// ------------------
// Start of log entry 2
// ...some log details
// ...some log details
// ------------------
// Start of log entry 3
// ...some log details
// ...some log details
迭代器总是向前看一行。一旦该 lline 是新条目的开头,它就会将前一个条目包装在流中,并将其 return 转换为 next
。工厂方法 streamOf
将此迭代器转换为流,以便在我上面给出的示例中使用。
我将拆分条件从正则表达式更改为 Predicate
,因此您可以借助多个正则表达式指定更复杂的条件,if-conditions,等等。
请注意,我只使用上面的示例数据对其进行了测试,因此我不知道它在更复杂、错误或空输入时的表现如何。
我有数百个大型 (6GB) gzip 日志文件,我正在使用 GZIPInputStream
阅读这些文件并希望对其进行解析。假设每个都有格式:
Start of log entry 1
...some log details
...some log details
...some log details
Start of log entry 2
...some log details
...some log details
...some log details
Start of log entry 3
...some log details
...some log details
...some log details
我正在通过 BufferedReader.lines()
逐行传输 gzip 文件内容。流看起来像:
[
"Start of log entry 1",
" ...some log details",
" ...some log details",
" ...some log details",
"Start of log entry 2",
" ...some log details",
" ...some log details",
" ...some log details",
"Start of log entry 2",
" ...some log details",
" ...some log details",
" ...some log details",
]
每个日志条目的开始都可以由谓词识别:line -> line.startsWith("Start of log entry")
。我想根据这个谓词将这个 Stream<String>
转换成 Stream<Stream<String>>
。每个 "substream" 应该在谓词为真时开始,并在谓词为假时收集行,直到下一次谓词为真,这表示该子流的结束和下一个子流的开始。结果如下:
[
[
"Start of log entry 1",
" ...some log details",
" ...some log details",
" ...some log details",
],
[
"Start of log entry 2",
" ...some log details",
" ...some log details",
" ...some log details",
],
[
"Start of log entry 3",
" ...some log details",
" ...some log details",
" ...some log details",
],
]
从那里,我可以获取每个子流并将其映射到 new LogEntry(Stream<String> logLines)
,以便将相关的日志行聚合到 LogEntry
个对象中。
这里有一个粗略的想法:
import java.io.*;
import java.nio.charset.*;
import java.util.*;
import java.util.function.*;
import java.util.stream.*;
import static java.lang.System.out;
class Untitled {
static final String input =
"Start of log entry 1\n" +
" ...some log details\n" +
" ...some log details\n" +
" ...some log details\n" +
"Start of log entry 2\n" +
" ...some log details\n" +
" ...some log details\n" +
" ...some log details\n" +
"Start of log entry 3\n" +
" ...some log details\n" +
" ...some log details\n" +
" ...some log details";
static final Predicate<String> isLogEntryStart = line -> line.startsWith("Start of log entry");
public static void main(String[] args) throws Exception {
try (ByteArrayInputStream gzipInputStream
= new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)); // mock for fileInputStream based gzipInputStream
InputStreamReader inputStreamReader = new InputStreamReader( gzipInputStream );
BufferedReader reader = new BufferedReader( inputStreamReader )) {
reader.lines()
.splitByPredicate(isLogEntryStart) // <--- What witchcraft should go here?
.map(LogEntry::new)
.forEach(out::println);
}
}
}
约束:我有数百个这样的大文件要并行处理(但每个文件只有一个顺序流),这使得将它们完全加载到内存中(例如通过将它们存储为 List<String> lines
) 不可行。
感谢任何帮助!
我认为主要问题是您正在逐行阅读并尝试在行之外创建一个 LogEntry
实例,而不是逐块阅读(这可能会覆盖很多行)。
为此,您可以使用 Scanner.findAll
(自 Java 9 起可用)和适当的正则表达式:
String input =
"Start of log entry 1\n" +
" ...some log details 1.1\n" +
" ...some log details 1.2\n" +
" ...some log details 1.3\n" +
"Start of log entry 2\n" +
" ...some log details 2.1\n" +
" ...some log details 2.2\n" +
" ...some log details 2.3\n" +
"Start of log entry 3\n" +
" ...some log details 3.1\n" +
" ...some log details 3.2\n" +
" ...some log details 3.3";
try (ByteArrayInputStream gzip =
new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8));
InputStreamReader reader = new InputStreamReader(gzip);
Scanner scanner = new Scanner(reader)) {
String START = "Start of log entry \d+";
Pattern pattern = Pattern.compile(
START + "(?<=" + START + ").*?(?=" + START + "|$)",
Pattern.DOTALL);
scanner.findAll(pattern)
.map(MatchResult::group)
.map(s -> s.split("\R"))
.map(LogEntry::new)
.forEach(System.out::println);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
因此,这是通过在 Scanner
实例中延迟查找匹配来实现的。 Scanner.findAll
returns 一个 Stream<MatchResult>
和 MatchResult.group()
returns 匹配的 String
。然后我们将这个字符串拆分为 line-breaks (\R
)。这 returns 一个 String[]
数组的每个元素都是每一行。然后,假设 LogEntry
有一个接受 String[]
参数的构造函数,我们将这些数组中的每一个都转换为一个 LogEntry
实例。最后,假设 LogEntry
有一个覆盖的 toString()
方法,我们将每个 LogEntry
实例打印到输出。
值得一提的是,Scanner
在流上调用 forEach
时开始工作。
另外一个注释是我们用来匹配输入中的日志条目的正则表达式。我不是正则表达式领域的专家,所以我几乎可以肯定这里还有很大的改进空间。首先,我们使用 Pattern.DOTALL
这样 .
不仅可以匹配普通字符,还可以匹配换行符。然后是实际的正则表达式。这个想法是它匹配并使用 Start of log entry \d+
,然后它使用 look-behind 来对抗 Start of log entry \d+
,然后它使用 Start of log entry \d+
中输入的字符=53=]non-greedy 方式(这是 .*?
部分)最后它 looks-ahead 检查是否有另一个出现 Start of log entry \d+
或是否已到达输入末尾。如果您想深入了解这个主题,请参阅此 amazing article about regular expressions。
如果您未使用 Java 9+,我不知道有任何类似的选择。不过,您可以做的是创建一个自定义 Spliterator
,它包装由 BufferedReader.lines()
返回的流返回的 Spliterator
,并向其添加所需的解析行为。然后,您需要从这个 Spliterator
中创建一个新的 Stream
。一点都不简单...
Frederico 的回答可能是解决这个特定问题的最佳方法。根据他对自定义 Spliterator
的最后想法,我将添加对 a similar question 的答案的改编版本,我在其中建议使用自定义迭代器来创建分块流。这种方法也适用于不是由输入读取器创建的其他流。
public class StreamSplitter<T>
implements Iterator<Stream<T>>
{
private Iterator<T> incoming;
private Predicate<T> startOfNewEntry;
private T nextLine;
public static <T> Stream<Stream<T>> streamOf(Stream<T> incoming, Predicate<T> startOfNewEntry)
{
Iterable<Stream<T>> iterable = () -> new StreamSplitter<>(incoming, startOfNewEntry);
return StreamSupport.stream(iterable.spliterator(), false);
}
private StreamSplitter(Stream<T> stream, Predicate<T> startOfNewEntry)
{
this.incoming = stream.iterator();
this.startOfNewEntry = startOfNewEntry;
if (incoming.hasNext())
nextLine = incoming.next();
}
@Override
public boolean hasNext()
{
return nextLine != null;
}
@Override
public Stream<T> next()
{
List<T> nextEntrysLines = new ArrayList<>();
do
{
nextEntrysLines.add(nextLine);
} while (incoming.hasNext()
&& !startOfNewEntry.test((nextLine = incoming.next())));
if (!startOfNewEntry.test(nextLine)) // incoming does not have next
nextLine = null;
return nextEntrysLines.stream();
}
}
例子
public static void main(String[] args)
{
Stream<String> flat = Stream.of("Start of log entry 1",
" ...some log details",
" ...some log details",
"Start of log entry 2",
" ...some log details",
" ...some log details",
"Start of log entry 3",
" ...some log details",
" ...some log details");
StreamSplitter.streamOf(flat, line -> line.matches("Start of log entry.*"))
.forEach(logEntry -> {
System.out.println("------------------");
logEntry.forEach(System.out::println);
});
}
// Output
// ------------------
// Start of log entry 1
// ...some log details
// ...some log details
// ------------------
// Start of log entry 2
// ...some log details
// ...some log details
// ------------------
// Start of log entry 3
// ...some log details
// ...some log details
迭代器总是向前看一行。一旦该 lline 是新条目的开头,它就会将前一个条目包装在流中,并将其 return 转换为 next
。工厂方法 streamOf
将此迭代器转换为流,以便在我上面给出的示例中使用。
我将拆分条件从正则表达式更改为 Predicate
,因此您可以借助多个正则表达式指定更复杂的条件,if-conditions,等等。
请注意,我只使用上面的示例数据对其进行了测试,因此我不知道它在更复杂、错误或空输入时的表现如何。