使用 Java 流上的过滤操作结果调用函数的最佳方法是什么

What is the best way to call a function with the result of a filter operation on a Java stream

我想知道使用新的 Java 8 流 API 读取文件的最佳方法是什么,通过模式匹配过滤输入流,然后将结果传递给从匹配中消耗组的函数。我预计这是不可能的,因为流操作是无状态的,模式匹配器中的组无法传递到流处理器中的下一步。

所以,假设我有这个

stream.filter(s -> pattern.matcher(s).matches())

是否可以向接受 s 或最好是来自 matches() 的组的函数添加标注?像

stream.filter(s -> pattern.matcher(s).matches()).ifTrue(s -> myfunc(s))

我基本上想避免将所有匹配项收集到内存中的数据结构中,因为我不知道我的流有多大。假设我有一个大文件,我想构建并序列化一个封装模式匹配器组的对象。我不想将所有匹配项保存在一个数据结构中并强制处理,那么对文件进行操作的最佳方法是什么,这样我就可以处理一行并限制我的内存消耗,同时还允许文件并行处理?

遍历这些行并一次处理这些行是否更好?这是否等同于流上 forEach() 的终端操作?在我的用例中,我可能不关心无法在终端操作上并行化的副作用,但我很好奇在并行处理期间 forEach 会出现问题的一般情况。

谢谢。

您可以在直播中peek

Returns a stream consisting of the elements of this stream, additionally performing the provided action on each element as elements are consumed from the resulting stream.

This is an intermediate operation.

流是完整的,但您可以使用 lambda 表达式(Consumer)执行您的操作。

stream.filter(s -> pattern.matcher(s).matches()).peek(s -> myfunc(s))

你的要求有点奇怪。如果您调用 matches,则表示您的整个 String 匹配,因此元素 匹配项。

而且在流上请求 .ifTrue(s -> myfunc(s)) 方法是没有意义的,实际上,这让我想知道你认为 filter 做了什么。如果您在 filter 之后链接一个动作,它将仅应用于匹配项,因为这是 filter 的目的。

所以 stream.filter(s -> pattern.matcher(s).matches()).forEach(s -> myfunc(s)) 是完成这项工作的正确工具,它既不意味着数据收集也不意味着多线程问题。如果您的函数 myfunc 本身没有线程问题,filter(…).forEach(…) 可以 运行 并行而不会出现问题。


值得注意的是 Pattern 有一个 asPredicate 方法。它使用 find 而不是 matches,但这可以通过将锚点添加到模式来解决:

Stream.of("a", "b", "ab", "bb", "aaa", "bab")
      .filter(Pattern.compile("^a*$").asPredicate())
      .forEach(System.out::println);

将打印

a
aaa

也将与 parallel 一起使用(只是顺序可能会改变)。


如果你真的需要Matcher的状态,你必须选择。

  1. 先创建 Matcher 然后应用 filter:

    Pattern p=Pattern.compile("b(a+)b");
    Stream.of("a", "b", "bab", "bb", "aa", "baaab")//.parallel()
          .map(p::matcher)
          .filter(Matcher::matches)
          .mapToInt(m->m.end(1)-m.start(1))
          .forEach(System.out::println);
    
  2. 使用flatMap,用结果Stream

    表示映射和过滤的组合结果
    Pattern p=Pattern.compile("b(a+)b");
    Stream.of("a", "b", "bab", "bb", "aa", "baaab")//.parallel()
          .flatMap(s-> { Matcher m=p.matcher(s);
              return m.matches()? Stream.of(m.group(1)): Stream.empty(); })
          .mapToInt(String::length)
          .forEach(System.out::println);
    

两者都将打印 13 并且对于 parallel 执行是安全的;在并行流中使用 forEach 可能会改变顺序,仅此而已。如果您对源顺序感兴趣,可以使用forEachOrdered