Java 按谓词将流拆分为流的流

Java split stream by predicate into stream of streams

我有数百个大型 (6GB) gzip 日志文件,我正在使用 GZIPInputStream 阅读这些文件并希望对其进行解析。假设每个都有格式:

Start of log entry 1
    ...some log details
    ...some log details
    ...some log details
Start of log entry 2
    ...some log details
    ...some log details
    ...some log details
Start of log entry 3
    ...some log details
    ...some log details
    ...some log details

我正在通过 BufferedReader.lines() 逐行传输 gzip 文件内容。流看起来像:

[
    "Start of log entry 1",
    "    ...some log details",
    "    ...some log details",
    "    ...some log details",
    "Start of log entry 2",
    "    ...some log details",
    "    ...some log details",
    "    ...some log details",
    "Start of log entry 2",
    "    ...some log details",
    "    ...some log details",
    "    ...some log details",
]

每个日志条目的开始都可以由谓词识别:line -> line.startsWith("Start of log entry")。我想根据这个谓词将这个 Stream<String> 转换成 Stream<Stream<String>>。每个 "substream" 应该在谓词为真时开始,并在谓词为假时收集行,直到下一次谓词为真,这表示该子流的结束和下一个子流的开始。结果如下:

[
    [
        "Start of log entry 1",
        "    ...some log details",
        "    ...some log details",
        "    ...some log details",
    ],
    [
        "Start of log entry 2",
        "    ...some log details",
        "    ...some log details",
        "    ...some log details",
    ],
    [
        "Start of log entry 3",
        "    ...some log details",
        "    ...some log details",
        "    ...some log details",
    ],
]

从那里,我可以获取每个子流并将其映射到 new LogEntry(Stream<String> logLines),以便将相关的日志行聚合到 LogEntry 个对象中。

这里有一个粗略的想法:

import java.io.*;
import java.nio.charset.*;
import java.util.*;
import java.util.function.*;
import java.util.stream.*;

import static java.lang.System.out;

class Untitled {
    static final String input = 
        "Start of log entry 1\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "Start of log entry 2\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "Start of log entry 3\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "    ...some log details";

    static final Predicate<String> isLogEntryStart = line -> line.startsWith("Start of log entry"); 

    public static void main(String[] args) throws Exception {
        try (ByteArrayInputStream gzipInputStream
        = new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)); // mock for fileInputStream based gzipInputStream
             InputStreamReader inputStreamReader = new InputStreamReader( gzipInputStream ); 
             BufferedReader reader = new BufferedReader( inputStreamReader )) {

            reader.lines()
                .splitByPredicate(isLogEntryStart) // <--- What witchcraft should go here?
                .map(LogEntry::new)
                .forEach(out::println);
        }
    }
}

约束:我有数百个这样的大文件要并行处理(但每个文件只有一个顺序流),这使得将它们完全加载到内存中(例如通过将它们存储为 List<String> lines ) 不可行。

感谢任何帮助!

我认为主要问题是您正在逐行阅读并尝试在行之外创建一个 LogEntry 实例,而不是逐块阅读(这可能会覆盖很多行)。

为此,您可以使用 Scanner.findAll(自 Java 9 起可用)和适当的正则表达式:

String input =
        "Start of log entry 1\n"        +
        "    ...some log details 1.1\n" +
        "    ...some log details 1.2\n" +
        "    ...some log details 1.3\n" +
        "Start of log entry 2\n"        +
        "    ...some log details 2.1\n" +
        "    ...some log details 2.2\n" +
        "    ...some log details 2.3\n" +
        "Start of log entry 3\n"        +
        "    ...some log details 3.1\n" +
        "    ...some log details 3.2\n" +
        "    ...some log details 3.3";

try (ByteArrayInputStream gzip = 
         new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8));
     InputStreamReader reader = new InputStreamReader(gzip);
     Scanner scanner = new Scanner(reader)) {

    String START = "Start of log entry \d+";
    Pattern pattern = Pattern.compile(
            START + "(?<=" + START + ").*?(?=" + START + "|$)", 
            Pattern.DOTALL);

    scanner.findAll(pattern)
            .map(MatchResult::group)
            .map(s -> s.split("\R"))
            .map(LogEntry::new)
            .forEach(System.out::println);

} catch (IOException e) {
    throw new UncheckedIOException(e);
}

因此,这是通过在 Scanner 实例中延迟查找匹配来实现的。 Scanner.findAll returns 一个 Stream<MatchResult>MatchResult.group() returns 匹配的 String。然后我们将这个字符串拆分为 line-breaks (\R)。这 returns 一个 String[] 数组的每个元素都是每一行。然后,假设 LogEntry 有一个接受 String[] 参数的构造函数,我们将这些数组中的每一个都转换为一个 LogEntry 实例。最后,假设 LogEntry 有一个覆盖的 toString() 方法,我们将每个 LogEntry 实例打印到输出。

值得一提的是,Scanner 在流上调用 forEach 时开始工作。

另外一个注释是我们用来匹配输入中的日志条目的正则表达式。我不是正则表达式领域的专家,所以我几乎可以肯定这里还有很大的改进空间。首先,我们使用 Pattern.DOTALL 这样 . 不仅可以匹配普通字符,还可以匹配换行符。然后是实际的正则表达式。这个想法是它匹配并使用 Start of log entry \d+,然后它使用 look-behind 来对抗 Start of log entry \d+,然后它使用 Start of log entry \d+ 中输入的字符=53=]non-greedy 方式(这是 .*? 部分)最后它 looks-ahead 检查是否有另一个出现 Start of log entry \d+ 或是否已到达输入末尾。如果您想深入了解这个主题,请参阅此 amazing article about regular expressions


如果您未使用 Java 9+,我不知道有任何类似的选择。不过,您可以做的是创建一个自定义 Spliterator,它包装由 BufferedReader.lines() 返回的流返回的 Spliterator,并向其添加所需的解析行为。然后,您需要从这个 Spliterator 中创建一个新的 Stream。一点都不简单...

Frederico 的回答可能是解决这个特定问题的最佳方法。根据他对自定义 Spliterator 的最后想法,我将添加对 a similar question 的答案的改编版本,我在其中建议使用自定义迭代器来创建分块流。这种方法也适用于不是由输入读取器创建的其他流。

public class StreamSplitter<T>
    implements Iterator<Stream<T>>
{
    private Iterator<T>  incoming;
    private Predicate<T> startOfNewEntry;
    private T            nextLine;

    public static <T> Stream<Stream<T>> streamOf(Stream<T> incoming, Predicate<T> startOfNewEntry)
    {
        Iterable<Stream<T>> iterable = () -> new StreamSplitter<>(incoming, startOfNewEntry);
        return StreamSupport.stream(iterable.spliterator(), false);
    }

    private StreamSplitter(Stream<T> stream, Predicate<T> startOfNewEntry)
    {
        this.incoming = stream.iterator();
        this.startOfNewEntry = startOfNewEntry;
        if (incoming.hasNext())
            nextLine = incoming.next();
    }

    @Override
    public boolean hasNext()
    {
        return nextLine != null;
    }

    @Override
    public Stream<T> next()
    {
        List<T> nextEntrysLines = new ArrayList<>();
        do
        {
            nextEntrysLines.add(nextLine);
        } while (incoming.hasNext()
                 && !startOfNewEntry.test((nextLine = incoming.next())));

        if (!startOfNewEntry.test(nextLine)) // incoming does not have next
            nextLine = null;

        return nextEntrysLines.stream();
    }
}

例子

public static void main(String[] args)
{
    Stream<String> flat = Stream.of("Start of log entry 1",
                                    "    ...some log details",
                                    "    ...some log details",
                                    "Start of log entry 2",
                                    "    ...some log details",
                                    "    ...some log details",
                                    "Start of log entry 3",
                                    "    ...some log details",
                                    "    ...some log details");

    StreamSplitter.streamOf(flat, line -> line.matches("Start of log entry.*"))
                  .forEach(logEntry -> {
                      System.out.println("------------------");
                      logEntry.forEach(System.out::println);
                  });
}

// Output
// ------------------
// Start of log entry 1
//     ...some log details
//     ...some log details
// ------------------
// Start of log entry 2
//     ...some log details
//     ...some log details
// ------------------
// Start of log entry 3
//     ...some log details
//     ...some log details

迭代器总是向前看一行。一旦该 lline 是新条目的开头,它就会将前一个条目包装在流中,并将其 return 转换为 next。工厂方法 streamOf 将此迭代器转换为流,以便在我上面给出的示例中使用。

我将拆分条件从正则表达式更改为 Predicate,因此您可以借助多个正则表达式指定更复杂的条件,if-conditions,等等。

请注意,我只使用上面的示例数据对其进行了测试,因此我不知道它在更复杂、错误或空输入时的表现如何。